Untitled
unknown
plain_text
2 years ago
20 kB
8
Indexable
from ast import literal_eval as ast_literal_eval from pandas import DataFrame as pd_DataFrame from pandas import read_sql as pd_read_sql from pandas import Series as pd_Series from json import loads as json_loads from os import environ as os_environ import numpy as np def fetchData(__dbConnection, apilogger, args, configProperties, entity, dataset, pdLkupData): __ln_trkr = 1900 __returnCode = '0000' __colsRenameCass = '' __df_dataExtract = pd_DataFrame() pdLkupData_bk = pdLkupData.copy() try: __ln_trkr = 1905 if len(pdLkupData) == 0: raise Exception('Previous dataFetch failed') # Variable assignment __ln_trkr = 1910 apilogger.info(('%s|args: %s' % (__ln_trkr,args))) __opId = args["opId"] __buId = args["buId"] __batchId = args["batchId"] __logPath = os_environ.get('ANALYTICSDATASERVICELOGPATH') + '/logs/' # Configuration values assignment __ln_trkr = 1915 __sourceDbType = configProperties.get('config','sourcedatabase') bill_grp_config = json_loads(configProperties.get(entity, dataset+'-common-billgrp')) __query = configProperties.get(entity, dataset+'-'+__sourceDbType+'.query') dbType = configProperties.get('config','sourcedatabase') __ln_trkr = 1920 import cx_Oracle username = configProperties.get('tibrpts-'+dbType,'username') password = configProperties.get('tibrpts-'+dbType,'password') connectstring = configProperties.get('tibrpts-'+dbType,'connectstring') apilogger.info(('%s|Connection to : %s' % (username,password))) dbConnection = cx_Oracle.connect(username,password,connectstring) __ln_trkr = 1921 #monthly_service_fee df_monthly = pdLkupData[(pdLkupData['revnt_event_type']=='RC')] if len(df_monthly) == 0 : pdLkupData['monthly_service_fee'] = 0.0 else : df_monthly = df_monthly.drop_duplicates(['account_id', 'invoice_num','event_seq_nbr'], keep='first') df_monthly = (df_monthly.groupby(['account_id','invoice_num']).agg({'account_id':lambda x: x.nunique(), 'revnt_amt_exl_tax': 'sum'}).rename(columns={'account_id':'grouped_account_id','invoice_num':'grouped_invoice_num','revnt_amt_exl_tax':'monthly_service_fee'})).reset_index() pdLkupData = pdLkupData.merge(df_monthly,left_on=['account_id','invoice_num'],right_on=['account_id','invoice_num'],how = 'left') apilogger.info(('%s|__Grouping Completed (monthly_service_fee) : %s' % (__ln_trkr, len(pdLkupData)))) __ln_trkr = 1922 #usg df_usg = pdLkupData[(pdLkupData['revnt_event_type']=='USAGE')] if len(df_usg) == 0 : pdLkupData['usg'] = 0.0 else : df_usg = df_usg.drop_duplicates(['account_id', 'invoice_num','event_seq_nbr'], keep='first') df_usg = (df_usg.groupby(['account_id','invoice_num']).agg({'account_id':lambda x: x.nunique(), 'revnt_amt_exl_tax': 'sum'}).rename(columns={'account_id':'grouped_account_id','invoice_num':'grouped_invoice_num','revnt_amt_exl_tax':'usg'})).reset_index() pdLkupData = pdLkupData.merge(df_usg,left_on=['account_id','invoice_num'],right_on=['account_id','invoice_num'],how = 'left') apilogger.info(('%s|__Grouping Completed (usg) : %s' % (__ln_trkr, len(pdLkupData)))) __ln_trkr = 1923 #oct df_oct = pdLkupData[(pdLkupData['revnt_event_type']=='USAGE') & (pdLkupData['bill_line_item_code']=='IDD')] if len(df_oct) == 0 : pdLkupData['oct'] = 0.0 else : df_oct = df_oct.drop_duplicates(['account_id', 'invoice_num','event_seq_nbr'], keep='first') df_oct = (df_oct.groupby(['account_id','invoice_num']).agg({'account_id':lambda x: x.nunique(), 'revnt_amt_tax': 'sum'}).rename(columns={'account_id':'grouped_account_id','invoice_num':'grouped_nvoice_num','revnt_amt_tax':'oct'})).reset_index() pdLkupData = pdLkupData.merge(df_oct,left_on=['account_id','invoice_num'],right_on=['account_id','invoice_num'],how = 'left') apilogger.info(('%s|__Grouping Completed (oct) : %s' % (__ln_trkr, len(pdLkupData)))) __ln_trkr = 1924 #otc df_otc = pdLkupData[(pdLkupData['revnt_event_type']=='NRC')] if len(df_otc) == 0 : pdLkupData['otc'] = 0.0 else : #df_oct = df_oct.drop_duplicates(['account_id', 'invoice_num','event_seq_nbr'], keep='first') df_otc = (df_otc.groupby(['account_id','invoice_num']).agg({'account_id':lambda x: x.nunique(), 'revnt_amt_exl_tax': 'sum'}).rename(columns={'account_id':'grouped_account_id','invoice_num':'grouped_invoice_num','revnt_amt_exl_tax':'otc'})).reset_index() pdLkupData = pdLkupData.merge(df_otc,left_on=['account_id','invoice_num'],right_on=['account_id','invoice_num'],how = 'left') apilogger.info(('%s|__Grouping Completed (otc) : %s' % (__ln_trkr, len(pdLkupData)))) __ln_trkr = 192410 #ref df_ref = pdLkupData[(pdLkupData['revnt_event_type']=='REF')] if len(df_ref) == 0 : pdLkupData['refund'] = 0.0 pdLkupData['refundTax'] = 0.0 else : df_ref = df_ref.drop_duplicates(['account_id', 'invoice_num','event_seq_nbr'], keep='first') df_ref_tax = df_ref df_ref = (df_ref.groupby(['account_id','invoice_num']).agg({'account_id':lambda x: x.nunique(), 'revnt_totalamount': 'sum'}).rename(columns={'account_id':'grouped_account_id','invoice_num':'grouped_invoice_num','revnt_totalamount':'refund'})).reset_index() pdLkupData = pdLkupData.merge(df_ref,left_on=['account_id','invoice_num'],right_on=['account_id','invoice_num'],how = 'left') apilogger.info(('%s|__Grouping Completed (Refund) : %s' % (__ln_trkr, len(pdLkupData)))) df_ref_tax = (df_ref_tax.groupby(['account_id','invoice_num']).agg({'account_id':lambda x: x.nunique(), 'revnt_amt_tax': 'sum'}).rename(columns={'account_id':'grouped_account_id','invoice_num':'grouped_invoice_num','revnt_amt_tax':'refundTax'})).reset_index() pdLkupData = pdLkupData.merge(df_ref_tax,left_on=['account_id','invoice_num'],right_on=['account_id','invoice_num'],how = 'left') apilogger.info(('%s|__Grouping Completed (Refund Tax) : %s' % (__ln_trkr, len(pdLkupData)))) apilogger.info(('%s|__Grouping Completed (Refund) : %s' % (__ln_trkr, len(pdLkupData)))) __ln_trkr = 192411 #payment df_payment = pdLkupData[(pdLkupData['revnt_event_type']=='PAYM') | (pdLkupData['revnt_event_type']=='CAPM')] df_payment = df_payment.drop_duplicates(['account_id', 'invoice_num','revnt_uom','revnt_event_type'], keep='first') #df_payment.to_csv(sep='|', header=True, index=False, path_or_buf='/app/server/HOBS-AnalyticsDataService/logs/rmsCreateAcct_payments'+args["batchId"]+'__192411.csv') if len(df_payment) == 0 : pdLkupData['payment_received'] = 0.0 else : df_payment = (df_payment.groupby(['account_id','invoice_num']).agg({'account_id':lambda x: x.nunique(), 'revnt_amt_exl_tax': 'sum'}).rename(columns={'account_id':'grouped_account_id','invoice_num':'grouped_invoice_num','revnt_amt_exl_tax':'payment_received'})).reset_index() pdLkupData = pdLkupData.merge(df_payment,left_on=['account_id','invoice_num'],right_on=['account_id','invoice_num'],how = 'left') apilogger.info(('%s|__Grouping Completed (payments) : %s' % (__ln_trkr, len(pdLkupData)))) __ln_trkr = 192412 #adjustment df_adjustment = pdLkupData[(pdLkupData['revnt_event_type']=='ADJ')] #df_adjustment.to_csv(sep='|', header=True, index=False, path_or_buf='/app/server/HOBS-AnalyticsDataService/logs/rmsCreateAcct'+args["batchId"]+'__192412.csv') if len(df_adjustment) == 0 : pdLkupData['credit_adjustment'] = 0.0 pdLkupData['debit_adjustment'] = 0.0 else : df_adjustment_credit = pdLkupData[(pdLkupData['revnt_service_type_id']=='CREDIT')] df_adjustment_debit = pdLkupData[(pdLkupData['revnt_service_type_id']=='DEBIT')] #df_adjustment_debit.to_csv(sep='|', header=True, index=False, path_or_buf='/app/server/HOBS-AnalyticsDataService/logs/rmsCreateAcctDebit'+args["batchId"]+'__192412.csv') if len(df_adjustment_credit) == 0 : pdLkupData['credit_adjustment'] = 0.0 else : df_adjustment_credit = (df_adjustment_credit.groupby(['account_id','invoice_num']).agg({'account_id':lambda x: x.nunique(), 'revnt_totalamount': 'sum'}).rename(columns={'account_id':'grouped_account_id','invoice_num':'grouped_invoice_num','revnt_totalamount':'credit_adjustment'})).reset_index() pdLkupData = pdLkupData.merge(df_adjustment_credit,left_on=['account_id','invoice_num'],right_on=['account_id','invoice_num'],how = 'left') apilogger.info(('%s|__Grouping Completed (Credit Adj) : %s' % (__ln_trkr, len(pdLkupData)))) if len(df_adjustment_debit) == 0 : pdLkupData['debit_adjustment'] = 0.0 else : df_adjustment_debit = (df_adjustment_debit.groupby(['account_id','invoice_num']).agg({'account_id':lambda x: x.nunique(), 'revnt_amt_exl_tax': 'sum'}).rename(columns={'account_id':'grouped_account_id','invoice_num':'grouped_invoice_num','revnt_amt_exl_tax':'debit_adjustment'})).reset_index() pdLkupData = pdLkupData.merge(df_adjustment_debit,left_on=['account_id','invoice_num'],right_on=['account_id','invoice_num'],how = 'left') apilogger.info(('%s|__Grouping Completed (Debit Adj) : %s' % (__ln_trkr, len(pdLkupData)))) #pdLkupData.to_csv(sep='|', header=True, index=False, path_or_buf='/app/server/HOBS-AnalyticsDataService/logs/rmsCreateAcctAfterDebit'+args["batchId"]+'__192412.csv') __ln_trkr = 192413#nrcdeposit df_nrcdeposit = pdLkupData[(pdLkupData['revnt_event_type']=='NRC') & (pdLkupData['rpci_component_type']=='DEPOSIT')] if len(df_nrcdeposit) == 0 : pdLkupData['deposit'] = 0.0 else : df_nrcdeposit = df_nrcdeposit.drop_duplicates(['account_id', 'invoice_num','event_seq_nbr'], keep='first') df_nrcdeposit = (df_nrcdeposit.groupby(['account_id','invoice_num']).agg({'account_id':lambda x: x.nunique(), 'revnt_amt_exl_tax': 'sum'}).rename(columns={'account_id':'grouped_account_id','invoice_num':'grouped_invoice_num','revnt_amt_exl_tax':'deposit'})).reset_index() pdLkupData = pdLkupData.merge(df_nrcdeposit,left_on=['account_id','invoice_num'],right_on=['account_id','invoice_num'],how = 'left') apilogger.info(('%s|__Grouping Completed (deposit) : %s' % (__ln_trkr, len(pdLkupData)))) __ln_trkr = 192413 pdLkupData['total_amount_due'] = pdLkupData['total_amount_due'].replace(np.nan,0.0) #pdLkupData['total_current_charges'] = pdLkupData['total_current_charges'].replace(np.nan,0.0) pdLkupData['monthly_service_fee'] = pdLkupData['monthly_service_fee'].replace(np.nan,0.0) pdLkupData['usg'] = pdLkupData['usg'].replace(np.nan,0.0) pdLkupData['oct'] = pdLkupData['oct'].replace(np.nan,0.0) pdLkupData['discount'] = pdLkupData['discount'].replace(np.nan,0.0) __ln_trkr = 192414 pdLkupData['installation_address'] = 'NOT APPLICABLE' pdLkupData['tax_adj_vat'] = 0 pdLkupData['tax_adj_oct'] = 0 #pdLkupData['age'] = 0 __ln_trkr = 192415 pdLkupData['oct'].fillna(0.0, inplace=True) pdLkupData['vat'].fillna(0.0, inplace=True) pdLkupData['refundTax'].fillna(0.0, inplace=True) pdLkupData['oct'] = pdLkupData['oct'].replace(np.nan,0.0) pdLkupData['vat'] = pdLkupData['vat'].replace(np.nan,0.0) pdLkupData['refundTax'] = pdLkupData['refundTax'].replace(np.nan,0.0) __ln_trkr = 1924151 pdLkupData['oct'] = pdLkupData['oct'].astype(float, errors='ignore') __ln_trkr = 1924152 apilogger.info(('%s| Datatype of column oct : %s' % (__ln_trkr, pdLkupData['oct'].dtypes))) apilogger.info(('%s| Datatype of column vat : %s' % (__ln_trkr, pdLkupData['vat'].dtypes))) pdLkupData['vat'] = pdLkupData['vat'] - pdLkupData['oct'] - pdLkupData['refundTax'] __ln_trkr = 192416 pdLkupData['balance_brought_forward'].fillna(0.0, inplace=True) pdLkupData['payment_received'].fillna(0.0, inplace=True) pdLkupData['debit_adjustment'].fillna(0.0, inplace=True) pdLkupData['credit_adjustment'].fillna(0.0, inplace=True) pdLkupData['refund'].fillna(0.0, inplace=True) pdLkupData['balance_brought_forward'] = pdLkupData['balance_brought_forward'].replace(np.nan,0.0) pdLkupData['payment_received'] = pdLkupData['payment_received'].replace(np.nan,0.0) pdLkupData['credit_adjustment'] = pdLkupData['credit_adjustment'].replace(np.nan,0.0) pdLkupData['debit_adjustment'] = pdLkupData['debit_adjustment'].replace(np.nan,0.0) pdLkupData['refund'] = pdLkupData['refund'].replace(np.nan,0.0) pdLkupData['total_balance_from_last_bill'] = pdLkupData['balance_brought_forward'] + pdLkupData['payment_received'] + pdLkupData['credit_adjustment'] apilogger.info(('%s|__Grouping Completed (total BBF) : %s' % (__ln_trkr, len(pdLkupData)))) __ln_trkr = 192417 pdLkupData['otc'].fillna(0.0, inplace=True) pdLkupData['deposit'].fillna(0.0, inplace=True) pdLkupData['otc'] = pdLkupData['otc'].replace(np.nan,0.0) pdLkupData['deposit'] = pdLkupData['deposit'].replace(np.nan,0.0) pdLkupData['otc'] = pdLkupData['otc'] - pdLkupData['deposit'] __ln_trkr = 192418 pdLkupData['total_current_charges'] = pdLkupData['monthly_service_fee'] + pdLkupData['usg'] + pdLkupData['otc'] + pdLkupData['discount'] + pdLkupData['vat'] + pdLkupData['oct'] + pdLkupData['debit_adjustment'] + pdLkupData['deposit'] __ln_trkr = 192419 pdLkupData['account_number'] = pdLkupData['account_id'] __ln_trkr = 192420 #Printable bill_grp_dict = bill_grp_config df_billgrp = pd_DataFrame(bill_grp_dict.items(),columns=['bill_grp','printable']) pdLkupData = pdLkupData.merge(df_billgrp,left_on=['grp'],right_on=['bill_grp'],how = 'left') __ln_trkr = 192421 #pdLkupData.loc[(pdLkupData['monthly_service_fee'] + pdLkupData['usg'] ) == 0, 'printable'] = 'NO' pdLkupData.loc[(pdLkupData['total_current_charges']) <= 0, 'printable'] = 'NO' __ln_trkr = 192422 pdLkupData['printable'].fillna('YES', inplace=True) pdLkupData['printable'] = pdLkupData['printable'].replace(np.nan,'YES') pdLkupData['discount'] = pdLkupData['discount'].replace(np.nan,0.0) pdLkupData['currency'] = pdLkupData['currency'].replace(np.nan,'NA') pdLkupData['credit_limit'] = pdLkupData['credit_limit'].replace(np.nan,0.0) pdLkupData['account_status'] = pdLkupData['account_status'].replace(np.nan,'NA') pdLkupData['tax_profile'] = pdLkupData['tax_profile'].replace(np.nan,'NA') pdLkupData['customer_number'] = pdLkupData['customer_number'].replace(np.nan,'NA') pdLkupData['account_name'] = pdLkupData['account_name'].replace(np.nan,'NA') pdLkupData['subsidiary_code'] = pdLkupData['subsidiary_code'].replace(np.nan,'NA') pdLkupData['customer_name'] = pdLkupData['customer_name'].replace(np.nan,'NA') pdLkupData['account_manager'] = pdLkupData['account_manager'].replace(np.nan,'NA') pdLkupData['financial_accountant'] = pdLkupData['financial_accountant'].replace(np.nan,'NA') pdLkupData['subscriber_category'] = pdLkupData['subscriber_category'].replace(np.nan,'NA') pdLkupData['subscriber_sub_category'] = pdLkupData['subscriber_sub_category'].replace(np.nan,'NA') pdLkupData['billing_address'] = pdLkupData['billing_address'].replace(np.nan,'NA') pdLkupData['billing_region'] = pdLkupData['billing_region'].replace(np.nan,'NA') pdLkupData['grp'] = pdLkupData['grp'].replace(np.nan,'NA') __ln_trkr = 192423 pdLkupData.to_csv(sep='|', header=True, index=False, path_or_buf='/app/server/HOBS-AnalyticsDataService/logs/Account_Lvl_Before_Drop.csv') __ln_trkr = 1925 __df_dataExtract = pd_read_sql('SELECT * FROM TIBRPTS.ETPI_INV_DTL_REP_ACCT_LVL WHERE ROWNUM=1',con = dbConnection) __df_dataExtract.columns = __df_dataExtract.columns.str.lower() __ln_trkr = 1030 __colsExtracted = list(__df_dataExtract) apilogger.info(('%s|__colsExtracted for Acc Lvl : %s' % (__ln_trkr, __colsExtracted))) __data_cols = list(pdLkupData) apilogger.info(('%s|__dataframe_columns : %s' % (__ln_trkr, __data_cols))) __remove_columns = [item for item in __data_cols if item not in __colsExtracted] if len(__remove_columns) > 0: apilogger.info(('%s|__remove_columns : %s' % (__ln_trkr, ','.join(__remove_columns)))) pdLkupData.drop(__remove_columns, axis=1, inplace=True) else: apilogger.info(('%s|__remove_columns : No extra columns to remove' % __ln_trkr)) __ln_trkr = 1935 apilogger.info(('%s|__csv_cassandra_columns : %s' % (__ln_trkr, pdLkupData.columns))) __fileName = 'ETPI_INV_DTL_ACC_LVL_RPT' __data_csv = __logPath + __fileName + '_' + str(args["batchId"])+'.csv' pdLkupData.to_csv(sep='|', header=True, index=False, path_or_buf=__data_csv) __fileName = 'ETPI_INV_DTL_ACC_LVL_RPT_Drop_Duplicate' __data_csv = __logPath + __fileName + '_' + str(args["batchId"])+'.csv' pdLkupData = pdLkupData.drop_duplicates(['account_number', 'invoice_number'], keep='first') pdLkupData.to_csv(sep='|', header=True, index=False, path_or_buf=__data_csv) apilogger.info(('%s| Final DF taype : %s' % (__ln_trkr, pdLkupData.dtypes))) __ln_trkr = 1936 acct_lvl_records = [tuple(x) for x in pdLkupData[['bill_date','customer_number','customer_name','account_number','account_name','account_status','currency','tax_profile','subscriber_category','subscriber_sub_category','billing_region','credit_limit','subsidiary_code','billing_address','installation_address','account_manager','financial_accountant','grp','invoice_number','balance_brought_forward','payment_received','credit_adjustment','debit_adjustment','tax_adj_vat','tax_adj_oct','age','total_balance_from_last_bill','monthly_service_fee','usg','discount','vat','oct','otc','refund','deposit','total_current_charges','total_amount_due','bl_status','printable']].values] __cursor = dbConnection.cursor() __cursor.executemany(__query,acct_lvl_records) dbConnection.commit() __ln_trkr = 1999 apilogger.info(('%s|Count: %s' % (__ln_trkr, len(__df_dataExtract)))) except Exception as __exp: apilogger.error(('%s|Error|%s' % (__ln_trkr, str(__exp)))) __returnCode = '9999' return (__returnCode, pdLkupData_bk) if __name__ == '__main__': fetchData(__dbConnection, apilogger, args, configProperties, entity, dataset, pdLkupData)
Editor is loading...