Untitled

 avatar
unknown
plain_text
20 days ago
18 kB
5
Indexable
due_generation/DueGenerator.py:
import traceback
import pandas as pd

from due_generation.due_generation_factory.dues_factory import DuesFactory
from logger.handler import Logger
from rabbitmq_setup.producer import publish_events
from repayment_allocation.repayment_allocator.repayment_allocation import AllocateExcessRepayments
from util import config
from util.models.carry_info import CarryInfo
from util.models.charge_summary import ChargeSummary
from util.models.charges import ChargesConfig
from util.models.repayment_summary import RepaymentSummary
from util.string_constants import BATCH_ID, EVENTS, AS_OF_DATE, REPAYMENT_SUMMARY, UNPAID_INSTALMENTS, \
    EXCESS_ALLOCATION, LOAN_ID, PRODUCT_PARTNERSHIP_ID, COMPUTE_RESULT, ALLOCATIONS, LOAN_STATES, \
    PRODUCT_ID, FUNDING_ALLOCATION, CHARGES_DUE, CHARGES_SUMMARY, CHARGES_SUMMARY_LIST, CARRY_INFO,CHARGES_CONFIG
from util.util_functions import (convert_to_dataframe, get_lms_config_df, convert_to_json, nack_message,
                                 get_colending_type, check_empty_dataframe)

logger = Logger()
import json

def generate_dues(channel, method, _header_frame, body, args, connection):
    try:
        ehsClient = args
        input = body
        batch_id = input[BATCH_ID]
        logger.info("Received input for due generation for the batch id : {}".format(batch_id))
        logger.debug(input)
        data = input[EVENTS]
        as_of_date = input[AS_OF_DATE]
        df = convert_to_dataframe(data)
        customer_rps = df[df[FUNDING_ALLOCATION] == 1]
        partner_rps = df[df[FUNDING_ALLOCATION] != 1]
        if REPAYMENT_SUMMARY in df.columns:
            repayment_dataframe = df[REPAYMENT_SUMMARY].explode()
            repayment_dataframe = pd.json_normalize(repayment_dataframe[repayment_dataframe.notna()])
        else:
            repayment_dataframe = pd.DataFrame(columns=RepaymentSummary().repayment_summary_keys)
        lms_config_df = get_lms_config_df(df[df[FUNDING_ALLOCATION] == 1])
        colending_config = get_lms_config_df(df)
        colending_config.drop_duplicates([LOAN_ID, PRODUCT_PARTNERSHIP_ID, PRODUCT_ID], inplace=True)
        colending_config = get_colending_type(colending_config, customer_rps)
        repayment_dataframe = get_colending_type(repayment_dataframe, customer_rps)
        lms_config_df.drop_duplicates([LOAN_ID, PRODUCT_PARTNERSHIP_ID], inplace=True)
        charges_summary = pd.DataFrame(columns=ChargeSummary().charge_summary_keys)
        if CHARGES_SUMMARY_LIST in df.columns:
            charges_summary = convert_to_dataframe(df[CHARGES_SUMMARY_LIST].explode())
            charges_summary = check_empty_dataframe(charges_summary, ChargeSummary().charge_summary_keys)
        charges_config = convert_to_dataframe(df[CHARGES_CONFIG].explode())
        charges_config = check_empty_dataframe(charges_config, ChargesConfig().charges_config)
        if CARRY_INFO in df.columns:
            carry_info = pd.json_normalize(df[CARRY_INFO].explode())
            carry_info = check_empty_dataframe(carry_info, CarryInfo().carry_info_keys)
        else:
            carry_info = pd.DataFrame(columns=CarryInfo().carry_info_keys)
        if UNPAID_INSTALMENTS in df.columns:
            unpaid_instalments = df[UNPAID_INSTALMENTS].explode()
            unpaid_instalments = pd.json_normalize(unpaid_instalments[unpaid_instalments.notna()])
        else:
            unpaid_instalments = pd.DataFrame()
        dues_factory = DuesFactory()
        dues_factory.dues_dataframe = df
        dues_factory.as_of_date = as_of_date
        dues_factory.carry_info = carry_info
        dues_factory.batch_id = batch_id
        dues_factory.charge_summary = charges_summary
        dues_calculator = dues_factory.generate_dues()
        due_entries, charges_due, charges_summary, carry_info= dues_calculator.generate_dues_data()
        if not colending_config.empty:
            due_entries = due_entries.merge(colending_config, on=[LOAN_ID, PRODUCT_PARTNERSHIP_ID, PRODUCT_ID])
        cumulative_calculator = dues_factory.calculate_cumulative_dues()
        cumulative_dues = cumulative_calculator.calculate_cumulative_dues(due_entries=due_entries,charges_summary=charges_summary,charges_config=charges_config)
        excess_payment_allocator = AllocateExcessRepayments(repayment_dataframe=repayment_dataframe,
                                                            config=lms_config_df,
                                                            dues_dataframe=due_entries,
                                                            unpaid_instalments=unpaid_instalments,
                                                            loan_state=cumulative_dues,
                                                            as_of_date=as_of_date,
                                                            batch_id=batch_id,
                                                            repayment_purpose=EXCESS_ALLOCATION,
                                                            colending_config=colending_config,
                                                            partner_rps=partner_rps,
                                                            customer_rps=customer_rps, repayment_allocation_logic=None)
        if not repayment_dataframe.empty:
            excess_payment_response = excess_payment_allocator.allocate_repayments()
        else:
            excess_payment_response = {
                COMPUTE_RESULT: {
                    CHARGES_DUE: convert_to_json(charges_due),
                    CHARGES_SUMMARY: convert_to_json(charges_summary),
                    ALLOCATIONS: convert_to_json(due_entries),
                    LOAN_STATES: convert_to_json(cumulative_dues),
                    CARRY_INFO: convert_to_json(carry_info)
                }
            }
        excess_payment_response[BATCH_ID] = batch_id
        excess_payment_response[AS_OF_DATE] = as_of_date
        logger.info("Generated the dues and pushing the response to EHS.")
        logger.debug(excess_payment_response)
        #channel.basic_ack(delivery_tag=method.delivery_tag)
        #publish_events(config.RESULT_ROUTING_KEY, excess_payment_response, ehsClient, batch_id)
    except Exception:
        logger.exception(traceback.format_exc())
        '''
             (406, 'PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out.
             Timeout value used: 1800000 ms. This timeout value can be configured, 
             see consumers doc guide to learn more')
             channel is available here. use the ehs client for saving the erroneous packet
        '''
        #nack_message(channel, method.delivery_tag, ehs_client=ehsClient, batch_id=batch_id)

with open('sample3.json') as f:
   inp = json.load(f)
   generate_dues(None, None, None, inp, None, None)


--------------------------------------------
 replay/replay_trigger.py

import json
import traceback

import pandas as pd

from logger.handler import Logger
from replay.replay_calculation_factory.replay_calculation_factory import ReplayCalculationFactory
from util.models.accrual import Accrual
from util.models.carry_info import CarryInfo
from util.models.charge_summary import ChargeSummary
from util.models.charges import ChargesAccrual, ChargesConfig, IncidentalCharges, ChargesDue
from util.models.colending import Colending
from util.models.collection import Collection
from util.models.disbursement import Disbursement
from util.models.interest_rate import InterestRate
from util.models.lender_collection import LenderCollection
from util.models.lender_collection_info import LenderCollectionInfo
from util.models.loan_state import LoanState
from util.models.moratorium_data import Moratorium
from util.models.repayment_allocation import RepaymentAllocation
from util.models.repayment_schedule import RepaymentSchedule
from util.models.repayment_summary import RepaymentSummary
from util.models.transfer_data import TransferData
from util.string_constants import BATCH_ID, START_DATE, END_DATE, EVENTS, DISBURSEMENT, RPS, \
    ACCRUAL, ALLOCATIONS, REPAYMENT_SUMMARY, CHARGES_ACCRUAL, CHARGES_SUMMARY, CHARGES_DUE, LOAN_ID, INTEREST_RATES, \
    LOAN_STATES, REPLAY_ID, config_util, LENDER_COLLECTION_INFO, LENDER_COLLECTION, LENDER_TRANSFER_DATA, \
    COLENDING_COLLECTION, TRANSFER_DATA, COLLECTION_INFO, CARRY_INFO, MORATORIUMS
from util.util_functions import convert_to_dataframe, check_empty_dataframe, ack_message, nack_message

logger = Logger()


def trigger_replay(channel, method, _header_frame, body, args, connection):
    try:
        ehsClient = args
        batch_id = body.get(BATCH_ID)
        replay_id = body.get(REPLAY_ID)
        start_date = body.get(START_DATE)
        end_date = body.get(END_DATE)
        logger.info(
            f"Received input to trigger replay for loan id: {body.get(LOAN_ID)} and batch_id: {batch_id} with start date as {start_date} and end_date as {end_date}.")
        logger.debug(body)
        data = body.get(EVENTS)
        dataframe = convert_to_dataframe(data)
        if config_util.getProperty('REPLAY_BATCHING') is not None and "True".__eq__(
                config_util.getProperty("REPLAY_BATCHING")):
            replay_id = pd.json_normalize(dataframe['loanToReplayIdMap'].explode())
            replay_ids = list(replay_id[REPLAY_ID].unique())
        else:
            replay_id = [body.get(REPLAY_ID)]
        config_dataframe = pd.json_normalize(dataframe['lmsConfig'].explode())
        disbursement_dataframe = pd.json_normalize(dataframe[DISBURSEMENT].explode())
        disbursement_dataframe = check_empty_dataframe(disbursement_dataframe, Disbursement().disbursement_keys)
        colending_dataframe = pd.json_normalize(dataframe['colending'].explode())
        colending_dataframe = check_empty_dataframe(colending_dataframe, Colending().colending_keys)
        interest_rate_dataframe = pd.json_normalize(dataframe[INTEREST_RATES].explode())
        interest_rate_dataframe = check_empty_dataframe(interest_rate_dataframe, InterestRate().interest_rate)
        rps = pd.json_normalize(dataframe[RPS.lower()].explode())
        rps = check_empty_dataframe(rps, RepaymentSchedule().repayment_schedule_keys)
        accrual_dataframe = pd.json_normalize(dataframe[ACCRUAL].explode())
        accrual_dataframe = check_empty_dataframe(accrual_dataframe, Accrual().accrual_keys)
        allocations_dataframe = pd.json_normalize(dataframe[ALLOCATIONS].explode())
        allocations_dataframe = check_empty_dataframe(allocations_dataframe,
                                                      RepaymentAllocation().repayment_allocation_keys)
        repayment_summary = pd.json_normalize(dataframe[REPAYMENT_SUMMARY].explode())
        repayment_summary = check_empty_dataframe(repayment_summary, RepaymentSummary().repayment_summary_keys)
        charges_accrual = pd.json_normalize(dataframe[CHARGES_ACCRUAL].explode())
        charges_accrual = check_empty_dataframe(charges_accrual, ChargesAccrual().charges_accrual_key)
        charges_summary = pd.json_normalize(dataframe[CHARGES_SUMMARY].explode())
        charges_summary = check_empty_dataframe(charges_summary, ChargeSummary().charge_summary_keys)
        loan_state = pd.json_normalize(dataframe[LOAN_STATES].explode())
        loan_state = check_empty_dataframe(loan_state, LoanState().loan_state_keys)
        charges_config = pd.json_normalize(dataframe['chargesConfig'].explode())
        charges_config = check_empty_dataframe(charges_config, ChargesConfig().charges_config)
        incidental_charges = pd.json_normalize(dataframe['incidentalCharges'].explode())
        incidental_charges = check_empty_dataframe(incidental_charges, IncidentalCharges().incidental_charges)
        charges_due = pd.json_normalize(
            dataframe[CHARGES_DUE].explode()) if CHARGES_DUE in dataframe.columns else pd.DataFrame()
        charges_due = check_empty_dataframe(charges_due, ChargesDue().charges_due_key)
        repayment_collection = pd.json_normalize(dataframe['repaymentCollection'].explode())
        repayment_collection = check_empty_dataframe(repayment_collection, Collection().collection_keys)
        manual_allocations = pd.json_normalize(
            dataframe['manualAllocations'].explode()) if 'manualAllocations' in dataframe.columns else pd.DataFrame()
        manual_allocations = check_empty_dataframe(manual_allocations, RepaymentAllocation().manual_allocation_keys)
        lender_collection = pd.json_normalize(dataframe[LENDER_COLLECTION].explode())
        lender_collection = check_empty_dataframe(lender_collection, LenderCollection().lender_collection_keys)
        transfer_data = pd.json_normalize(dataframe[TRANSFER_DATA].explode())
        transfer_data = check_empty_dataframe(transfer_data, TransferData().transfer_data_keys)
        collection_info = pd.json_normalize(dataframe[COLLECTION_INFO].explode())
        collection_info = check_empty_dataframe(collection_info, LenderCollectionInfo().lender_collection_info_keys)
        carry_info = pd.DataFrame(columns=CarryInfo().carry_info_keys)
        moratorium_data = pd.json_normalize(
            dataframe[MORATORIUMS].explode()) if MORATORIUMS in dataframe.columns else pd.DataFrame()
        moratorium_data = check_empty_dataframe(moratorium_data, Moratorium().morat_keys)
        if CARRY_INFO in dataframe.columns:
            carry_info = pd.json_normalize(dataframe[CARRY_INFO].explode())
        carry_info = check_empty_dataframe(carry_info, CarryInfo().carry_info_keys)
        replay_calculation_factory = ReplayCalculationFactory(batch_id=batch_id, start_date=start_date,
                                                              end_date=end_date,
                                                              config_df=config_dataframe,
                                                              disbursement_df=disbursement_dataframe,
                                                              colending_df=colending_dataframe,
                                                              interest_rate_df=interest_rate_dataframe, rps_df=rps,
                                                              accrual_df=accrual_dataframe,
                                                              allocations_df=allocations_dataframe,
                                                              repayment_summary_df=repayment_summary,
                                                              charges_accrual_df=charges_accrual,
                                                              charges_summary_df=charges_summary,
                                                              loan_state_df=loan_state,
                                                              charges_config=charges_config,
                                                              incidental_charges=incidental_charges,
                                                              charges_due=charges_due,
                                                              repayment_collection=repayment_collection,
                                                              manual_allocations=manual_allocations,
                                                              ehs_client=ehsClient, replay_id=replay_id,
                                                              lender_collection=lender_collection,
                                                              transfer_data=transfer_data,
                                                              lender_collection_info=collection_info,
                                                              carry_info=carry_info,
                                                              morat_data=moratorium_data
                                                              )
        replay_calculation_factory.trigger_replay(None, None)
        logger.info("The replay is successful and pushed the response to EHS.")
    except Exception as e:
        logger.exception(traceback.format_exc())
        if config_util.getProperty('REPLAY_BATCHING') is not None and "True".__eq__(
                config_util.getProperty("REPLAY_BATCHING")):
            replay_id = replay_ids
        # channel.basic_nack(delivery_tag=method.delivery_tag, multiple=False, requeue=False)
        #nack_message(channel, method.delivery_tag, ehs_client=ehsClient, replay_id=replay_id)

with open('sample3.json') as f:
   inp = json.load(f)
   trigger_replay(None, None, None, inp, None, None)



---------------------------------------
util/config_util.py

import time

from kazoo.exceptions import SessionExpiredError

from zookeeper_setup.zookeeper_client import ZookeeperClient

# from kazoo.client import DataWatch

#zClient = ZookeeperClient()
serviceName = "/services/omniservicingbusiness/"
properties = dict()


class ConfigUtil:
    def load_full_tree(self, serviceName="", propertiesMap=None):
        if propertiesMap is None:
            propertiesMap = dict()
        start = time.time()
        # for child in zClient.zk.get_children(serviceName):
        #     propertiesMap[child] = self.getProperty(child)
        print("time taken : ", str(start - time.time()))

    def __init__(self):
        self.load_full_tree(serviceName, properties)

    def pathWatcher(self, event):
        """

        @param event:
        @return:
        """
        keyName = str(event[2])
        #keyName = keyName.replace(serviceName, "")
        self.getProperty(keyName=keyName, reset=True)

    def getProperty(self, keyName, reset=False):
        zKeyName = serviceName + keyName
        if keyName in properties and not reset:
            return properties[keyName]
        #try:
            #if zClient.zk.exists(zKeyName):
            #    data, stat = zClient.zk.get(zKeyName, watch=self.pathWatcher)
            #    if data is not None:
            #        properties[keyName] = data.decode("utf-8")
            #        return properties[keyName]
        #except SessionExpiredError:
        #    print("Zookeeper session expired. Trying to connect again!")
        #    zClient.zk.stop()
        #    zClient.zk.start()
        #    self.getProperty(keyName)
        return None


Editor is loading...
Leave a Comment