Untitled
unknown
plain_text
20 days ago
18 kB
5
Indexable
due_generation/DueGenerator.py: import traceback import pandas as pd from due_generation.due_generation_factory.dues_factory import DuesFactory from logger.handler import Logger from rabbitmq_setup.producer import publish_events from repayment_allocation.repayment_allocator.repayment_allocation import AllocateExcessRepayments from util import config from util.models.carry_info import CarryInfo from util.models.charge_summary import ChargeSummary from util.models.charges import ChargesConfig from util.models.repayment_summary import RepaymentSummary from util.string_constants import BATCH_ID, EVENTS, AS_OF_DATE, REPAYMENT_SUMMARY, UNPAID_INSTALMENTS, \ EXCESS_ALLOCATION, LOAN_ID, PRODUCT_PARTNERSHIP_ID, COMPUTE_RESULT, ALLOCATIONS, LOAN_STATES, \ PRODUCT_ID, FUNDING_ALLOCATION, CHARGES_DUE, CHARGES_SUMMARY, CHARGES_SUMMARY_LIST, CARRY_INFO,CHARGES_CONFIG from util.util_functions import (convert_to_dataframe, get_lms_config_df, convert_to_json, nack_message, get_colending_type, check_empty_dataframe) logger = Logger() import json def generate_dues(channel, method, _header_frame, body, args, connection): try: ehsClient = args input = body batch_id = input[BATCH_ID] logger.info("Received input for due generation for the batch id : {}".format(batch_id)) logger.debug(input) data = input[EVENTS] as_of_date = input[AS_OF_DATE] df = convert_to_dataframe(data) customer_rps = df[df[FUNDING_ALLOCATION] == 1] partner_rps = df[df[FUNDING_ALLOCATION] != 1] if REPAYMENT_SUMMARY in df.columns: repayment_dataframe = df[REPAYMENT_SUMMARY].explode() repayment_dataframe = pd.json_normalize(repayment_dataframe[repayment_dataframe.notna()]) else: repayment_dataframe = pd.DataFrame(columns=RepaymentSummary().repayment_summary_keys) lms_config_df = get_lms_config_df(df[df[FUNDING_ALLOCATION] == 1]) colending_config = get_lms_config_df(df) colending_config.drop_duplicates([LOAN_ID, PRODUCT_PARTNERSHIP_ID, PRODUCT_ID], inplace=True) colending_config = get_colending_type(colending_config, customer_rps) repayment_dataframe = get_colending_type(repayment_dataframe, customer_rps) lms_config_df.drop_duplicates([LOAN_ID, PRODUCT_PARTNERSHIP_ID], inplace=True) charges_summary = pd.DataFrame(columns=ChargeSummary().charge_summary_keys) if CHARGES_SUMMARY_LIST in df.columns: charges_summary = convert_to_dataframe(df[CHARGES_SUMMARY_LIST].explode()) charges_summary = check_empty_dataframe(charges_summary, ChargeSummary().charge_summary_keys) charges_config = convert_to_dataframe(df[CHARGES_CONFIG].explode()) charges_config = check_empty_dataframe(charges_config, ChargesConfig().charges_config) if CARRY_INFO in df.columns: carry_info = pd.json_normalize(df[CARRY_INFO].explode()) carry_info = check_empty_dataframe(carry_info, CarryInfo().carry_info_keys) else: carry_info = pd.DataFrame(columns=CarryInfo().carry_info_keys) if UNPAID_INSTALMENTS in df.columns: unpaid_instalments = df[UNPAID_INSTALMENTS].explode() unpaid_instalments = pd.json_normalize(unpaid_instalments[unpaid_instalments.notna()]) else: unpaid_instalments = pd.DataFrame() dues_factory = DuesFactory() dues_factory.dues_dataframe = df dues_factory.as_of_date = as_of_date dues_factory.carry_info = carry_info dues_factory.batch_id = batch_id dues_factory.charge_summary = charges_summary dues_calculator = dues_factory.generate_dues() due_entries, charges_due, charges_summary, carry_info= dues_calculator.generate_dues_data() if not colending_config.empty: due_entries = due_entries.merge(colending_config, on=[LOAN_ID, PRODUCT_PARTNERSHIP_ID, PRODUCT_ID]) cumulative_calculator = dues_factory.calculate_cumulative_dues() cumulative_dues = cumulative_calculator.calculate_cumulative_dues(due_entries=due_entries,charges_summary=charges_summary,charges_config=charges_config) excess_payment_allocator = AllocateExcessRepayments(repayment_dataframe=repayment_dataframe, config=lms_config_df, dues_dataframe=due_entries, unpaid_instalments=unpaid_instalments, loan_state=cumulative_dues, as_of_date=as_of_date, batch_id=batch_id, repayment_purpose=EXCESS_ALLOCATION, colending_config=colending_config, partner_rps=partner_rps, customer_rps=customer_rps, repayment_allocation_logic=None) if not repayment_dataframe.empty: excess_payment_response = excess_payment_allocator.allocate_repayments() else: excess_payment_response = { COMPUTE_RESULT: { CHARGES_DUE: convert_to_json(charges_due), CHARGES_SUMMARY: convert_to_json(charges_summary), ALLOCATIONS: convert_to_json(due_entries), LOAN_STATES: convert_to_json(cumulative_dues), CARRY_INFO: convert_to_json(carry_info) } } excess_payment_response[BATCH_ID] = batch_id excess_payment_response[AS_OF_DATE] = as_of_date logger.info("Generated the dues and pushing the response to EHS.") logger.debug(excess_payment_response) #channel.basic_ack(delivery_tag=method.delivery_tag) #publish_events(config.RESULT_ROUTING_KEY, excess_payment_response, ehsClient, batch_id) except Exception: logger.exception(traceback.format_exc()) ''' (406, 'PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more') channel is available here. use the ehs client for saving the erroneous packet ''' #nack_message(channel, method.delivery_tag, ehs_client=ehsClient, batch_id=batch_id) with open('sample3.json') as f: inp = json.load(f) generate_dues(None, None, None, inp, None, None) -------------------------------------------- replay/replay_trigger.py import json import traceback import pandas as pd from logger.handler import Logger from replay.replay_calculation_factory.replay_calculation_factory import ReplayCalculationFactory from util.models.accrual import Accrual from util.models.carry_info import CarryInfo from util.models.charge_summary import ChargeSummary from util.models.charges import ChargesAccrual, ChargesConfig, IncidentalCharges, ChargesDue from util.models.colending import Colending from util.models.collection import Collection from util.models.disbursement import Disbursement from util.models.interest_rate import InterestRate from util.models.lender_collection import LenderCollection from util.models.lender_collection_info import LenderCollectionInfo from util.models.loan_state import LoanState from util.models.moratorium_data import Moratorium from util.models.repayment_allocation import RepaymentAllocation from util.models.repayment_schedule import RepaymentSchedule from util.models.repayment_summary import RepaymentSummary from util.models.transfer_data import TransferData from util.string_constants import BATCH_ID, START_DATE, END_DATE, EVENTS, DISBURSEMENT, RPS, \ ACCRUAL, ALLOCATIONS, REPAYMENT_SUMMARY, CHARGES_ACCRUAL, CHARGES_SUMMARY, CHARGES_DUE, LOAN_ID, INTEREST_RATES, \ LOAN_STATES, REPLAY_ID, config_util, LENDER_COLLECTION_INFO, LENDER_COLLECTION, LENDER_TRANSFER_DATA, \ COLENDING_COLLECTION, TRANSFER_DATA, COLLECTION_INFO, CARRY_INFO, MORATORIUMS from util.util_functions import convert_to_dataframe, check_empty_dataframe, ack_message, nack_message logger = Logger() def trigger_replay(channel, method, _header_frame, body, args, connection): try: ehsClient = args batch_id = body.get(BATCH_ID) replay_id = body.get(REPLAY_ID) start_date = body.get(START_DATE) end_date = body.get(END_DATE) logger.info( f"Received input to trigger replay for loan id: {body.get(LOAN_ID)} and batch_id: {batch_id} with start date as {start_date} and end_date as {end_date}.") logger.debug(body) data = body.get(EVENTS) dataframe = convert_to_dataframe(data) if config_util.getProperty('REPLAY_BATCHING') is not None and "True".__eq__( config_util.getProperty("REPLAY_BATCHING")): replay_id = pd.json_normalize(dataframe['loanToReplayIdMap'].explode()) replay_ids = list(replay_id[REPLAY_ID].unique()) else: replay_id = [body.get(REPLAY_ID)] config_dataframe = pd.json_normalize(dataframe['lmsConfig'].explode()) disbursement_dataframe = pd.json_normalize(dataframe[DISBURSEMENT].explode()) disbursement_dataframe = check_empty_dataframe(disbursement_dataframe, Disbursement().disbursement_keys) colending_dataframe = pd.json_normalize(dataframe['colending'].explode()) colending_dataframe = check_empty_dataframe(colending_dataframe, Colending().colending_keys) interest_rate_dataframe = pd.json_normalize(dataframe[INTEREST_RATES].explode()) interest_rate_dataframe = check_empty_dataframe(interest_rate_dataframe, InterestRate().interest_rate) rps = pd.json_normalize(dataframe[RPS.lower()].explode()) rps = check_empty_dataframe(rps, RepaymentSchedule().repayment_schedule_keys) accrual_dataframe = pd.json_normalize(dataframe[ACCRUAL].explode()) accrual_dataframe = check_empty_dataframe(accrual_dataframe, Accrual().accrual_keys) allocations_dataframe = pd.json_normalize(dataframe[ALLOCATIONS].explode()) allocations_dataframe = check_empty_dataframe(allocations_dataframe, RepaymentAllocation().repayment_allocation_keys) repayment_summary = pd.json_normalize(dataframe[REPAYMENT_SUMMARY].explode()) repayment_summary = check_empty_dataframe(repayment_summary, RepaymentSummary().repayment_summary_keys) charges_accrual = pd.json_normalize(dataframe[CHARGES_ACCRUAL].explode()) charges_accrual = check_empty_dataframe(charges_accrual, ChargesAccrual().charges_accrual_key) charges_summary = pd.json_normalize(dataframe[CHARGES_SUMMARY].explode()) charges_summary = check_empty_dataframe(charges_summary, ChargeSummary().charge_summary_keys) loan_state = pd.json_normalize(dataframe[LOAN_STATES].explode()) loan_state = check_empty_dataframe(loan_state, LoanState().loan_state_keys) charges_config = pd.json_normalize(dataframe['chargesConfig'].explode()) charges_config = check_empty_dataframe(charges_config, ChargesConfig().charges_config) incidental_charges = pd.json_normalize(dataframe['incidentalCharges'].explode()) incidental_charges = check_empty_dataframe(incidental_charges, IncidentalCharges().incidental_charges) charges_due = pd.json_normalize( dataframe[CHARGES_DUE].explode()) if CHARGES_DUE in dataframe.columns else pd.DataFrame() charges_due = check_empty_dataframe(charges_due, ChargesDue().charges_due_key) repayment_collection = pd.json_normalize(dataframe['repaymentCollection'].explode()) repayment_collection = check_empty_dataframe(repayment_collection, Collection().collection_keys) manual_allocations = pd.json_normalize( dataframe['manualAllocations'].explode()) if 'manualAllocations' in dataframe.columns else pd.DataFrame() manual_allocations = check_empty_dataframe(manual_allocations, RepaymentAllocation().manual_allocation_keys) lender_collection = pd.json_normalize(dataframe[LENDER_COLLECTION].explode()) lender_collection = check_empty_dataframe(lender_collection, LenderCollection().lender_collection_keys) transfer_data = pd.json_normalize(dataframe[TRANSFER_DATA].explode()) transfer_data = check_empty_dataframe(transfer_data, TransferData().transfer_data_keys) collection_info = pd.json_normalize(dataframe[COLLECTION_INFO].explode()) collection_info = check_empty_dataframe(collection_info, LenderCollectionInfo().lender_collection_info_keys) carry_info = pd.DataFrame(columns=CarryInfo().carry_info_keys) moratorium_data = pd.json_normalize( dataframe[MORATORIUMS].explode()) if MORATORIUMS in dataframe.columns else pd.DataFrame() moratorium_data = check_empty_dataframe(moratorium_data, Moratorium().morat_keys) if CARRY_INFO in dataframe.columns: carry_info = pd.json_normalize(dataframe[CARRY_INFO].explode()) carry_info = check_empty_dataframe(carry_info, CarryInfo().carry_info_keys) replay_calculation_factory = ReplayCalculationFactory(batch_id=batch_id, start_date=start_date, end_date=end_date, config_df=config_dataframe, disbursement_df=disbursement_dataframe, colending_df=colending_dataframe, interest_rate_df=interest_rate_dataframe, rps_df=rps, accrual_df=accrual_dataframe, allocations_df=allocations_dataframe, repayment_summary_df=repayment_summary, charges_accrual_df=charges_accrual, charges_summary_df=charges_summary, loan_state_df=loan_state, charges_config=charges_config, incidental_charges=incidental_charges, charges_due=charges_due, repayment_collection=repayment_collection, manual_allocations=manual_allocations, ehs_client=ehsClient, replay_id=replay_id, lender_collection=lender_collection, transfer_data=transfer_data, lender_collection_info=collection_info, carry_info=carry_info, morat_data=moratorium_data ) replay_calculation_factory.trigger_replay(None, None) logger.info("The replay is successful and pushed the response to EHS.") except Exception as e: logger.exception(traceback.format_exc()) if config_util.getProperty('REPLAY_BATCHING') is not None and "True".__eq__( config_util.getProperty("REPLAY_BATCHING")): replay_id = replay_ids # channel.basic_nack(delivery_tag=method.delivery_tag, multiple=False, requeue=False) #nack_message(channel, method.delivery_tag, ehs_client=ehsClient, replay_id=replay_id) with open('sample3.json') as f: inp = json.load(f) trigger_replay(None, None, None, inp, None, None) --------------------------------------- util/config_util.py import time from kazoo.exceptions import SessionExpiredError from zookeeper_setup.zookeeper_client import ZookeeperClient # from kazoo.client import DataWatch #zClient = ZookeeperClient() serviceName = "/services/omniservicingbusiness/" properties = dict() class ConfigUtil: def load_full_tree(self, serviceName="", propertiesMap=None): if propertiesMap is None: propertiesMap = dict() start = time.time() # for child in zClient.zk.get_children(serviceName): # propertiesMap[child] = self.getProperty(child) print("time taken : ", str(start - time.time())) def __init__(self): self.load_full_tree(serviceName, properties) def pathWatcher(self, event): """ @param event: @return: """ keyName = str(event[2]) #keyName = keyName.replace(serviceName, "") self.getProperty(keyName=keyName, reset=True) def getProperty(self, keyName, reset=False): zKeyName = serviceName + keyName if keyName in properties and not reset: return properties[keyName] #try: #if zClient.zk.exists(zKeyName): # data, stat = zClient.zk.get(zKeyName, watch=self.pathWatcher) # if data is not None: # properties[keyName] = data.decode("utf-8") # return properties[keyName] #except SessionExpiredError: # print("Zookeeper session expired. Trying to connect again!") # zClient.zk.stop() # zClient.zk.start() # self.getProperty(keyName) return None
Editor is loading...
Leave a Comment