Untitled

 avatar
unknown
plain_text
2 years ago
20 kB
8
Indexable
from ast import literal_eval as ast_literal_eval
from pandas import DataFrame as pd_DataFrame
from pandas import read_sql as pd_read_sql
from pandas import Series as pd_Series
from json import loads as json_loads
from os import environ as os_environ
import numpy as np

def fetchData(__dbConnection, apilogger, args, configProperties, entity, dataset, pdLkupData):
    __ln_trkr = 1900
    __returnCode = '0000'
    __colsRenameCass = ''
    __df_dataExtract = pd_DataFrame()
    pdLkupData_bk = pdLkupData.copy()
    
    try:
        __ln_trkr = 1905
        if len(pdLkupData) == 0:
            raise Exception('Previous dataFetch failed')
        
        # Variable assignment
        __ln_trkr = 1910
        apilogger.info(('%s|args: %s' % (__ln_trkr,args)))
        __opId = args["opId"]
        __buId = args["buId"]
        __batchId = args["batchId"]
        __logPath = os_environ.get('ANALYTICSDATASERVICELOGPATH') + '/logs/'
        
        # Configuration values assignment
        __ln_trkr = 1915
        __sourceDbType = configProperties.get('config','sourcedatabase')
        bill_grp_config = json_loads(configProperties.get(entity, dataset+'-common-billgrp'))
        __query = configProperties.get(entity, dataset+'-'+__sourceDbType+'.query')
        dbType = configProperties.get('config','sourcedatabase')

        __ln_trkr = 1920
        import cx_Oracle
        username = configProperties.get('tibrpts-'+dbType,'username')
        password = configProperties.get('tibrpts-'+dbType,'password')
        connectstring =  configProperties.get('tibrpts-'+dbType,'connectstring')
        apilogger.info(('%s|Connection to : %s' % (username,password)))
        dbConnection = cx_Oracle.connect(username,password,connectstring)
        
        __ln_trkr = 1921 #monthly_service_fee
        df_monthly = pdLkupData[(pdLkupData['revnt_event_type']=='RC')]
        if len(df_monthly) == 0 :
            pdLkupData['monthly_service_fee'] = 0.0
        else :
            df_monthly = df_monthly.drop_duplicates(['account_id', 'invoice_num','event_seq_nbr'], keep='first')
            df_monthly = (df_monthly.groupby(['account_id','invoice_num']).agg({'account_id':lambda x: x.nunique(), 'revnt_amt_exl_tax': 'sum'}).rename(columns={'account_id':'grouped_account_id','invoice_num':'grouped_invoice_num','revnt_amt_exl_tax':'monthly_service_fee'})).reset_index()
            pdLkupData = pdLkupData.merge(df_monthly,left_on=['account_id','invoice_num'],right_on=['account_id','invoice_num'],how = 'left')
        apilogger.info(('%s|__Grouping Completed  (monthly_service_fee)   : %s' % (__ln_trkr, len(pdLkupData))))

        __ln_trkr = 1922 #usg
        df_usg = pdLkupData[(pdLkupData['revnt_event_type']=='USAGE')]
        if len(df_usg) == 0 :
            pdLkupData['usg'] = 0.0
        else :
            df_usg = df_usg.drop_duplicates(['account_id', 'invoice_num','event_seq_nbr'], keep='first')
            df_usg = (df_usg.groupby(['account_id','invoice_num']).agg({'account_id':lambda x: x.nunique(), 'revnt_amt_exl_tax': 'sum'}).rename(columns={'account_id':'grouped_account_id','invoice_num':'grouped_invoice_num','revnt_amt_exl_tax':'usg'})).reset_index()
            pdLkupData = pdLkupData.merge(df_usg,left_on=['account_id','invoice_num'],right_on=['account_id','invoice_num'],how = 'left')
        apilogger.info(('%s|__Grouping Completed  (usg)   : %s' % (__ln_trkr, len(pdLkupData))))

        __ln_trkr = 1923 #oct
        df_oct = pdLkupData[(pdLkupData['revnt_event_type']=='USAGE') & (pdLkupData['bill_line_item_code']=='IDD')]
        if len(df_oct) == 0 :
            pdLkupData['oct'] = 0.0
        else :
            df_oct = df_oct.drop_duplicates(['account_id', 'invoice_num','event_seq_nbr'], keep='first') 
            df_oct = (df_oct.groupby(['account_id','invoice_num']).agg({'account_id':lambda x: x.nunique(), 'revnt_amt_tax': 'sum'}).rename(columns={'account_id':'grouped_account_id','invoice_num':'grouped_nvoice_num','revnt_amt_tax':'oct'})).reset_index()
            pdLkupData = pdLkupData.merge(df_oct,left_on=['account_id','invoice_num'],right_on=['account_id','invoice_num'],how = 'left')
        apilogger.info(('%s|__Grouping Completed  (oct)   : %s' % (__ln_trkr, len(pdLkupData))))

        __ln_trkr = 1924 #otc
        df_otc = pdLkupData[(pdLkupData['revnt_event_type']=='NRC')]
        if len(df_otc) == 0 :
            pdLkupData['otc'] = 0.0
        else :
            #df_oct = df_oct.drop_duplicates(['account_id', 'invoice_num','event_seq_nbr'], keep='first')
            df_otc = (df_otc.groupby(['account_id','invoice_num']).agg({'account_id':lambda x: x.nunique(), 'revnt_amt_exl_tax': 'sum'}).rename(columns={'account_id':'grouped_account_id','invoice_num':'grouped_invoice_num','revnt_amt_exl_tax':'otc'})).reset_index()
            pdLkupData = pdLkupData.merge(df_otc,left_on=['account_id','invoice_num'],right_on=['account_id','invoice_num'],how = 'left')
        apilogger.info(('%s|__Grouping Completed  (otc)   : %s' % (__ln_trkr, len(pdLkupData))))

        __ln_trkr = 192410 #ref
        df_ref = pdLkupData[(pdLkupData['revnt_event_type']=='REF')]
        if len(df_ref) == 0 :
            pdLkupData['refund'] = 0.0
            pdLkupData['refundTax'] = 0.0
        else :
            df_ref = df_ref.drop_duplicates(['account_id', 'invoice_num','event_seq_nbr'], keep='first')
            df_ref_tax = df_ref
            df_ref = (df_ref.groupby(['account_id','invoice_num']).agg({'account_id':lambda x: x.nunique(), 'revnt_totalamount': 'sum'}).rename(columns={'account_id':'grouped_account_id','invoice_num':'grouped_invoice_num','revnt_totalamount':'refund'})).reset_index()
            pdLkupData = pdLkupData.merge(df_ref,left_on=['account_id','invoice_num'],right_on=['account_id','invoice_num'],how = 'left') 
            apilogger.info(('%s|__Grouping Completed  (Refund)   : %s' % (__ln_trkr, len(pdLkupData))))

            df_ref_tax = (df_ref_tax.groupby(['account_id','invoice_num']).agg({'account_id':lambda x: x.nunique(), 'revnt_amt_tax': 'sum'}).rename(columns={'account_id':'grouped_account_id','invoice_num':'grouped_invoice_num','revnt_amt_tax':'refundTax'})).reset_index()
            pdLkupData = pdLkupData.merge(df_ref_tax,left_on=['account_id','invoice_num'],right_on=['account_id','invoice_num'],how = 'left')
            apilogger.info(('%s|__Grouping Completed  (Refund Tax)   : %s' % (__ln_trkr, len(pdLkupData))))

        apilogger.info(('%s|__Grouping Completed  (Refund)   : %s' % (__ln_trkr, len(pdLkupData))))

        __ln_trkr = 192411 #payment
        df_payment = pdLkupData[(pdLkupData['revnt_event_type']=='PAYM') | (pdLkupData['revnt_event_type']=='CAPM')]
        df_payment = df_payment.drop_duplicates(['account_id', 'invoice_num','revnt_uom','revnt_event_type'], keep='first')
        #df_payment.to_csv(sep='|', header=True, index=False, path_or_buf='/app/server/HOBS-AnalyticsDataService/logs/rmsCreateAcct_payments'+args["batchId"]+'__192411.csv')
        if len(df_payment) == 0 :
            pdLkupData['payment_received'] = 0.0
        else :
            df_payment = (df_payment.groupby(['account_id','invoice_num']).agg({'account_id':lambda x: x.nunique(), 'revnt_amt_exl_tax': 'sum'}).rename(columns={'account_id':'grouped_account_id','invoice_num':'grouped_invoice_num','revnt_amt_exl_tax':'payment_received'})).reset_index()
            pdLkupData = pdLkupData.merge(df_payment,left_on=['account_id','invoice_num'],right_on=['account_id','invoice_num'],how = 'left')
        apilogger.info(('%s|__Grouping Completed  (payments)   : %s' % (__ln_trkr, len(pdLkupData))))


        __ln_trkr = 192412 #adjustment
        df_adjustment = pdLkupData[(pdLkupData['revnt_event_type']=='ADJ')]
        #df_adjustment.to_csv(sep='|', header=True, index=False, path_or_buf='/app/server/HOBS-AnalyticsDataService/logs/rmsCreateAcct'+args["batchId"]+'__192412.csv')
        if len(df_adjustment) == 0 :
            pdLkupData['credit_adjustment'] = 0.0
            pdLkupData['debit_adjustment'] = 0.0
        else :
            df_adjustment_credit = pdLkupData[(pdLkupData['revnt_service_type_id']=='CREDIT')]
            df_adjustment_debit = pdLkupData[(pdLkupData['revnt_service_type_id']=='DEBIT')]
            #df_adjustment_debit.to_csv(sep='|', header=True, index=False, path_or_buf='/app/server/HOBS-AnalyticsDataService/logs/rmsCreateAcctDebit'+args["batchId"]+'__192412.csv')
            
            if len(df_adjustment_credit) == 0 :
                pdLkupData['credit_adjustment'] = 0.0
            else :
                df_adjustment_credit = (df_adjustment_credit.groupby(['account_id','invoice_num']).agg({'account_id':lambda x: x.nunique(), 'revnt_totalamount': 'sum'}).rename(columns={'account_id':'grouped_account_id','invoice_num':'grouped_invoice_num','revnt_totalamount':'credit_adjustment'})).reset_index()
                pdLkupData = pdLkupData.merge(df_adjustment_credit,left_on=['account_id','invoice_num'],right_on=['account_id','invoice_num'],how = 'left')
            apilogger.info(('%s|__Grouping Completed  (Credit Adj)   : %s' % (__ln_trkr, len(pdLkupData))))
            
            if len(df_adjustment_debit) == 0 :
                pdLkupData['debit_adjustment'] = 0.0
            else : 
                df_adjustment_debit = (df_adjustment_debit.groupby(['account_id','invoice_num']).agg({'account_id':lambda x: x.nunique(), 'revnt_amt_exl_tax': 'sum'}).rename(columns={'account_id':'grouped_account_id','invoice_num':'grouped_invoice_num','revnt_amt_exl_tax':'debit_adjustment'})).reset_index()
                pdLkupData = pdLkupData.merge(df_adjustment_debit,left_on=['account_id','invoice_num'],right_on=['account_id','invoice_num'],how = 'left')
            apilogger.info(('%s|__Grouping Completed  (Debit Adj)   : %s' % (__ln_trkr, len(pdLkupData))))
            #pdLkupData.to_csv(sep='|', header=True, index=False, path_or_buf='/app/server/HOBS-AnalyticsDataService/logs/rmsCreateAcctAfterDebit'+args["batchId"]+'__192412.csv')

        __ln_trkr = 192413#nrcdeposit
        df_nrcdeposit = pdLkupData[(pdLkupData['revnt_event_type']=='NRC') & (pdLkupData['rpci_component_type']=='DEPOSIT')]
        if len(df_nrcdeposit) == 0 :
            pdLkupData['deposit'] = 0.0
        else :
            df_nrcdeposit = df_nrcdeposit.drop_duplicates(['account_id', 'invoice_num','event_seq_nbr'], keep='first') 
            df_nrcdeposit = (df_nrcdeposit.groupby(['account_id','invoice_num']).agg({'account_id':lambda x: x.nunique(), 'revnt_amt_exl_tax': 'sum'}).rename(columns={'account_id':'grouped_account_id','invoice_num':'grouped_invoice_num','revnt_amt_exl_tax':'deposit'})).reset_index()
            pdLkupData = pdLkupData.merge(df_nrcdeposit,left_on=['account_id','invoice_num'],right_on=['account_id','invoice_num'],how = 'left')
        apilogger.info(('%s|__Grouping Completed  (deposit)   : %s' % (__ln_trkr, len(pdLkupData))))
           

        __ln_trkr = 192413
        pdLkupData['total_amount_due'] = pdLkupData['total_amount_due'].replace(np.nan,0.0)
        #pdLkupData['total_current_charges'] = pdLkupData['total_current_charges'].replace(np.nan,0.0)
        pdLkupData['monthly_service_fee'] = pdLkupData['monthly_service_fee'].replace(np.nan,0.0)
        pdLkupData['usg'] = pdLkupData['usg'].replace(np.nan,0.0)
        pdLkupData['oct'] = pdLkupData['oct'].replace(np.nan,0.0)
        pdLkupData['discount'] = pdLkupData['discount'].replace(np.nan,0.0)

        __ln_trkr = 192414
        pdLkupData['installation_address'] = 'NOT APPLICABLE'
        pdLkupData['tax_adj_vat'] = 0
        pdLkupData['tax_adj_oct'] = 0
        #pdLkupData['age'] = 0
       
        __ln_trkr = 192415
        pdLkupData['oct'].fillna(0.0, inplace=True)
        pdLkupData['vat'].fillna(0.0, inplace=True)
        pdLkupData['refundTax'].fillna(0.0, inplace=True)

        pdLkupData['oct'] = pdLkupData['oct'].replace(np.nan,0.0)
        pdLkupData['vat'] = pdLkupData['vat'].replace(np.nan,0.0)
        pdLkupData['refundTax'] = pdLkupData['refundTax'].replace(np.nan,0.0)

        __ln_trkr = 1924151
        pdLkupData['oct'] = pdLkupData['oct'].astype(float, errors='ignore')
        
        __ln_trkr = 1924152
        apilogger.info(('%s| Datatype of column oct : %s' % (__ln_trkr, pdLkupData['oct'].dtypes)))
        apilogger.info(('%s| Datatype of column vat : %s' % (__ln_trkr, pdLkupData['vat'].dtypes)))
        pdLkupData['vat'] = pdLkupData['vat'] - pdLkupData['oct'] - pdLkupData['refundTax']

        
        __ln_trkr = 192416
        pdLkupData['balance_brought_forward'].fillna(0.0, inplace=True)
        pdLkupData['payment_received'].fillna(0.0, inplace=True)
        pdLkupData['debit_adjustment'].fillna(0.0, inplace=True)
        pdLkupData['credit_adjustment'].fillna(0.0, inplace=True)
        pdLkupData['refund'].fillna(0.0, inplace=True)
        
        pdLkupData['balance_brought_forward'] = pdLkupData['balance_brought_forward'].replace(np.nan,0.0)
        pdLkupData['payment_received'] = pdLkupData['payment_received'].replace(np.nan,0.0)
        pdLkupData['credit_adjustment'] = pdLkupData['credit_adjustment'].replace(np.nan,0.0)
        pdLkupData['debit_adjustment'] = pdLkupData['debit_adjustment'].replace(np.nan,0.0)
        pdLkupData['refund'] = pdLkupData['refund'].replace(np.nan,0.0)

        pdLkupData['total_balance_from_last_bill'] = pdLkupData['balance_brought_forward'] + pdLkupData['payment_received'] + pdLkupData['credit_adjustment']
        apilogger.info(('%s|__Grouping Completed  (total BBF)   : %s' % (__ln_trkr, len(pdLkupData))))
       

        __ln_trkr = 192417
        pdLkupData['otc'].fillna(0.0, inplace=True)
        pdLkupData['deposit'].fillna(0.0, inplace=True)

        pdLkupData['otc'] = pdLkupData['otc'].replace(np.nan,0.0)
        pdLkupData['deposit'] = pdLkupData['deposit'].replace(np.nan,0.0)

        pdLkupData['otc'] = pdLkupData['otc'] - pdLkupData['deposit']
        
        __ln_trkr = 192418
        pdLkupData['total_current_charges'] = pdLkupData['monthly_service_fee'] +  pdLkupData['usg'] + pdLkupData['otc'] + pdLkupData['discount'] + pdLkupData['vat'] + pdLkupData['oct'] + pdLkupData['debit_adjustment'] + pdLkupData['deposit']
        
        __ln_trkr = 192419
        pdLkupData['account_number'] = pdLkupData['account_id']
 
        __ln_trkr = 192420 #Printable
        bill_grp_dict = bill_grp_config
        df_billgrp = pd_DataFrame(bill_grp_dict.items(),columns=['bill_grp','printable'])
        pdLkupData = pdLkupData.merge(df_billgrp,left_on=['grp'],right_on=['bill_grp'],how = 'left')

        __ln_trkr = 192421
        #pdLkupData.loc[(pdLkupData['monthly_service_fee'] + pdLkupData['usg'] ) == 0, 'printable'] = 'NO'
        pdLkupData.loc[(pdLkupData['total_current_charges']) <= 0, 'printable'] = 'NO'

        __ln_trkr = 192422
        pdLkupData['printable'].fillna('YES', inplace=True)
        pdLkupData['printable'] = pdLkupData['printable'].replace(np.nan,'YES')

        pdLkupData['discount'] = pdLkupData['discount'].replace(np.nan,0.0)
        pdLkupData['currency'] = pdLkupData['currency'].replace(np.nan,'NA')
        pdLkupData['credit_limit'] = pdLkupData['credit_limit'].replace(np.nan,0.0)
        pdLkupData['account_status'] = pdLkupData['account_status'].replace(np.nan,'NA')
        pdLkupData['tax_profile'] = pdLkupData['tax_profile'].replace(np.nan,'NA')
        pdLkupData['customer_number'] = pdLkupData['customer_number'].replace(np.nan,'NA')
        pdLkupData['account_name'] = pdLkupData['account_name'].replace(np.nan,'NA')
        pdLkupData['subsidiary_code'] = pdLkupData['subsidiary_code'].replace(np.nan,'NA')
        pdLkupData['customer_name'] = pdLkupData['customer_name'].replace(np.nan,'NA')
        pdLkupData['account_manager'] = pdLkupData['account_manager'].replace(np.nan,'NA')
        pdLkupData['financial_accountant'] = pdLkupData['financial_accountant'].replace(np.nan,'NA')
        pdLkupData['subscriber_category'] = pdLkupData['subscriber_category'].replace(np.nan,'NA')
        pdLkupData['subscriber_sub_category'] = pdLkupData['subscriber_sub_category'].replace(np.nan,'NA')
        pdLkupData['billing_address'] = pdLkupData['billing_address'].replace(np.nan,'NA')
        pdLkupData['billing_region'] = pdLkupData['billing_region'].replace(np.nan,'NA')
        pdLkupData['grp'] = pdLkupData['grp'].replace(np.nan,'NA')

        __ln_trkr = 192423
        pdLkupData.to_csv(sep='|', header=True, index=False, path_or_buf='/app/server/HOBS-AnalyticsDataService/logs/Account_Lvl_Before_Drop.csv')
        
        __ln_trkr = 1925
        __df_dataExtract = pd_read_sql('SELECT * FROM TIBRPTS.ETPI_INV_DTL_REP_ACCT_LVL WHERE ROWNUM=1',con = dbConnection)
        __df_dataExtract.columns = __df_dataExtract.columns.str.lower()
        
        __ln_trkr = 1030
        __colsExtracted = list(__df_dataExtract)
        apilogger.info(('%s|__colsExtracted for Acc Lvl : %s' % (__ln_trkr, __colsExtracted)))
        __data_cols = list(pdLkupData)
        apilogger.info(('%s|__dataframe_columns     : %s' % (__ln_trkr, __data_cols)))
        __remove_columns = [item for item in __data_cols if item not in __colsExtracted]
        if len(__remove_columns) > 0:
            apilogger.info(('%s|__remove_columns     : %s' % (__ln_trkr, ','.join(__remove_columns))))
            pdLkupData.drop(__remove_columns, axis=1, inplace=True)
        else:
            apilogger.info(('%s|__remove_columns     : No extra columns to remove' % __ln_trkr))        

        __ln_trkr = 1935
        apilogger.info(('%s|__csv_cassandra_columns     : %s' % (__ln_trkr, pdLkupData.columns)))
        __fileName = 'ETPI_INV_DTL_ACC_LVL_RPT'
        __data_csv = __logPath + __fileName + '_' + str(args["batchId"])+'.csv'
        pdLkupData.to_csv(sep='|', header=True, index=False, path_or_buf=__data_csv)

        __fileName = 'ETPI_INV_DTL_ACC_LVL_RPT_Drop_Duplicate'
        __data_csv = __logPath + __fileName + '_' + str(args["batchId"])+'.csv'
        pdLkupData = pdLkupData.drop_duplicates(['account_number', 'invoice_number'], keep='first')
        pdLkupData.to_csv(sep='|', header=True, index=False, path_or_buf=__data_csv)

        apilogger.info(('%s| Final DF taype : %s' % (__ln_trkr, pdLkupData.dtypes)))

        __ln_trkr = 1936
        acct_lvl_records = [tuple(x) for x in pdLkupData[['bill_date','customer_number','customer_name','account_number','account_name','account_status','currency','tax_profile','subscriber_category','subscriber_sub_category','billing_region','credit_limit','subsidiary_code','billing_address','installation_address','account_manager','financial_accountant','grp','invoice_number','balance_brought_forward','payment_received','credit_adjustment','debit_adjustment','tax_adj_vat','tax_adj_oct','age','total_balance_from_last_bill','monthly_service_fee','usg','discount','vat','oct','otc','refund','deposit','total_current_charges','total_amount_due','bl_status','printable']].values]
        __cursor = dbConnection.cursor()
        __cursor.executemany(__query,acct_lvl_records)
        dbConnection.commit()


        __ln_trkr = 1999
        apilogger.info(('%s|Count: %s' % (__ln_trkr, len(__df_dataExtract))))
        
    except Exception as __exp:
        apilogger.error(('%s|Error|%s' % (__ln_trkr, str(__exp))))
        __returnCode = '9999'
    return (__returnCode, pdLkupData_bk)


if __name__ == '__main__':
    fetchData(__dbConnection, apilogger, args, configProperties, entity, dataset, pdLkupData)
Editor is loading...