Untitled
unknown
plain_text
a year ago
18 kB
15
Indexable
due_generation/DueGenerator.py:
import traceback
import pandas as pd
from due_generation.due_generation_factory.dues_factory import DuesFactory
from logger.handler import Logger
from rabbitmq_setup.producer import publish_events
from repayment_allocation.repayment_allocator.repayment_allocation import AllocateExcessRepayments
from util import config
from util.models.carry_info import CarryInfo
from util.models.charge_summary import ChargeSummary
from util.models.charges import ChargesConfig
from util.models.repayment_summary import RepaymentSummary
from util.string_constants import BATCH_ID, EVENTS, AS_OF_DATE, REPAYMENT_SUMMARY, UNPAID_INSTALMENTS, \
EXCESS_ALLOCATION, LOAN_ID, PRODUCT_PARTNERSHIP_ID, COMPUTE_RESULT, ALLOCATIONS, LOAN_STATES, \
PRODUCT_ID, FUNDING_ALLOCATION, CHARGES_DUE, CHARGES_SUMMARY, CHARGES_SUMMARY_LIST, CARRY_INFO,CHARGES_CONFIG
from util.util_functions import (convert_to_dataframe, get_lms_config_df, convert_to_json, nack_message,
get_colending_type, check_empty_dataframe)
logger = Logger()
import json
def generate_dues(channel, method, _header_frame, body, args, connection):
try:
ehsClient = args
input = body
batch_id = input[BATCH_ID]
logger.info("Received input for due generation for the batch id : {}".format(batch_id))
logger.debug(input)
data = input[EVENTS]
as_of_date = input[AS_OF_DATE]
df = convert_to_dataframe(data)
customer_rps = df[df[FUNDING_ALLOCATION] == 1]
partner_rps = df[df[FUNDING_ALLOCATION] != 1]
if REPAYMENT_SUMMARY in df.columns:
repayment_dataframe = df[REPAYMENT_SUMMARY].explode()
repayment_dataframe = pd.json_normalize(repayment_dataframe[repayment_dataframe.notna()])
else:
repayment_dataframe = pd.DataFrame(columns=RepaymentSummary().repayment_summary_keys)
lms_config_df = get_lms_config_df(df[df[FUNDING_ALLOCATION] == 1])
colending_config = get_lms_config_df(df)
colending_config.drop_duplicates([LOAN_ID, PRODUCT_PARTNERSHIP_ID, PRODUCT_ID], inplace=True)
colending_config = get_colending_type(colending_config, customer_rps)
repayment_dataframe = get_colending_type(repayment_dataframe, customer_rps)
lms_config_df.drop_duplicates([LOAN_ID, PRODUCT_PARTNERSHIP_ID], inplace=True)
charges_summary = pd.DataFrame(columns=ChargeSummary().charge_summary_keys)
if CHARGES_SUMMARY_LIST in df.columns:
charges_summary = convert_to_dataframe(df[CHARGES_SUMMARY_LIST].explode())
charges_summary = check_empty_dataframe(charges_summary, ChargeSummary().charge_summary_keys)
charges_config = convert_to_dataframe(df[CHARGES_CONFIG].explode())
charges_config = check_empty_dataframe(charges_config, ChargesConfig().charges_config)
if CARRY_INFO in df.columns:
carry_info = pd.json_normalize(df[CARRY_INFO].explode())
carry_info = check_empty_dataframe(carry_info, CarryInfo().carry_info_keys)
else:
carry_info = pd.DataFrame(columns=CarryInfo().carry_info_keys)
if UNPAID_INSTALMENTS in df.columns:
unpaid_instalments = df[UNPAID_INSTALMENTS].explode()
unpaid_instalments = pd.json_normalize(unpaid_instalments[unpaid_instalments.notna()])
else:
unpaid_instalments = pd.DataFrame()
dues_factory = DuesFactory()
dues_factory.dues_dataframe = df
dues_factory.as_of_date = as_of_date
dues_factory.carry_info = carry_info
dues_factory.batch_id = batch_id
dues_factory.charge_summary = charges_summary
dues_calculator = dues_factory.generate_dues()
due_entries, charges_due, charges_summary, carry_info= dues_calculator.generate_dues_data()
if not colending_config.empty:
due_entries = due_entries.merge(colending_config, on=[LOAN_ID, PRODUCT_PARTNERSHIP_ID, PRODUCT_ID])
cumulative_calculator = dues_factory.calculate_cumulative_dues()
cumulative_dues = cumulative_calculator.calculate_cumulative_dues(due_entries=due_entries,charges_summary=charges_summary,charges_config=charges_config)
excess_payment_allocator = AllocateExcessRepayments(repayment_dataframe=repayment_dataframe,
config=lms_config_df,
dues_dataframe=due_entries,
unpaid_instalments=unpaid_instalments,
loan_state=cumulative_dues,
as_of_date=as_of_date,
batch_id=batch_id,
repayment_purpose=EXCESS_ALLOCATION,
colending_config=colending_config,
partner_rps=partner_rps,
customer_rps=customer_rps, repayment_allocation_logic=None)
if not repayment_dataframe.empty:
excess_payment_response = excess_payment_allocator.allocate_repayments()
else:
excess_payment_response = {
COMPUTE_RESULT: {
CHARGES_DUE: convert_to_json(charges_due),
CHARGES_SUMMARY: convert_to_json(charges_summary),
ALLOCATIONS: convert_to_json(due_entries),
LOAN_STATES: convert_to_json(cumulative_dues),
CARRY_INFO: convert_to_json(carry_info)
}
}
excess_payment_response[BATCH_ID] = batch_id
excess_payment_response[AS_OF_DATE] = as_of_date
logger.info("Generated the dues and pushing the response to EHS.")
logger.debug(excess_payment_response)
#channel.basic_ack(delivery_tag=method.delivery_tag)
#publish_events(config.RESULT_ROUTING_KEY, excess_payment_response, ehsClient, batch_id)
except Exception:
logger.exception(traceback.format_exc())
'''
(406, 'PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out.
Timeout value used: 1800000 ms. This timeout value can be configured,
see consumers doc guide to learn more')
channel is available here. use the ehs client for saving the erroneous packet
'''
#nack_message(channel, method.delivery_tag, ehs_client=ehsClient, batch_id=batch_id)
with open('sample3.json') as f:
inp = json.load(f)
generate_dues(None, None, None, inp, None, None)
--------------------------------------------
replay/replay_trigger.py
import json
import traceback
import pandas as pd
from logger.handler import Logger
from replay.replay_calculation_factory.replay_calculation_factory import ReplayCalculationFactory
from util.models.accrual import Accrual
from util.models.carry_info import CarryInfo
from util.models.charge_summary import ChargeSummary
from util.models.charges import ChargesAccrual, ChargesConfig, IncidentalCharges, ChargesDue
from util.models.colending import Colending
from util.models.collection import Collection
from util.models.disbursement import Disbursement
from util.models.interest_rate import InterestRate
from util.models.lender_collection import LenderCollection
from util.models.lender_collection_info import LenderCollectionInfo
from util.models.loan_state import LoanState
from util.models.moratorium_data import Moratorium
from util.models.repayment_allocation import RepaymentAllocation
from util.models.repayment_schedule import RepaymentSchedule
from util.models.repayment_summary import RepaymentSummary
from util.models.transfer_data import TransferData
from util.string_constants import BATCH_ID, START_DATE, END_DATE, EVENTS, DISBURSEMENT, RPS, \
ACCRUAL, ALLOCATIONS, REPAYMENT_SUMMARY, CHARGES_ACCRUAL, CHARGES_SUMMARY, CHARGES_DUE, LOAN_ID, INTEREST_RATES, \
LOAN_STATES, REPLAY_ID, config_util, LENDER_COLLECTION_INFO, LENDER_COLLECTION, LENDER_TRANSFER_DATA, \
COLENDING_COLLECTION, TRANSFER_DATA, COLLECTION_INFO, CARRY_INFO, MORATORIUMS
from util.util_functions import convert_to_dataframe, check_empty_dataframe, ack_message, nack_message
logger = Logger()
def trigger_replay(channel, method, _header_frame, body, args, connection):
try:
ehsClient = args
batch_id = body.get(BATCH_ID)
replay_id = body.get(REPLAY_ID)
start_date = body.get(START_DATE)
end_date = body.get(END_DATE)
logger.info(
f"Received input to trigger replay for loan id: {body.get(LOAN_ID)} and batch_id: {batch_id} with start date as {start_date} and end_date as {end_date}.")
logger.debug(body)
data = body.get(EVENTS)
dataframe = convert_to_dataframe(data)
if config_util.getProperty('REPLAY_BATCHING') is not None and "True".__eq__(
config_util.getProperty("REPLAY_BATCHING")):
replay_id = pd.json_normalize(dataframe['loanToReplayIdMap'].explode())
replay_ids = list(replay_id[REPLAY_ID].unique())
else:
replay_id = [body.get(REPLAY_ID)]
config_dataframe = pd.json_normalize(dataframe['lmsConfig'].explode())
disbursement_dataframe = pd.json_normalize(dataframe[DISBURSEMENT].explode())
disbursement_dataframe = check_empty_dataframe(disbursement_dataframe, Disbursement().disbursement_keys)
colending_dataframe = pd.json_normalize(dataframe['colending'].explode())
colending_dataframe = check_empty_dataframe(colending_dataframe, Colending().colending_keys)
interest_rate_dataframe = pd.json_normalize(dataframe[INTEREST_RATES].explode())
interest_rate_dataframe = check_empty_dataframe(interest_rate_dataframe, InterestRate().interest_rate)
rps = pd.json_normalize(dataframe[RPS.lower()].explode())
rps = check_empty_dataframe(rps, RepaymentSchedule().repayment_schedule_keys)
accrual_dataframe = pd.json_normalize(dataframe[ACCRUAL].explode())
accrual_dataframe = check_empty_dataframe(accrual_dataframe, Accrual().accrual_keys)
allocations_dataframe = pd.json_normalize(dataframe[ALLOCATIONS].explode())
allocations_dataframe = check_empty_dataframe(allocations_dataframe,
RepaymentAllocation().repayment_allocation_keys)
repayment_summary = pd.json_normalize(dataframe[REPAYMENT_SUMMARY].explode())
repayment_summary = check_empty_dataframe(repayment_summary, RepaymentSummary().repayment_summary_keys)
charges_accrual = pd.json_normalize(dataframe[CHARGES_ACCRUAL].explode())
charges_accrual = check_empty_dataframe(charges_accrual, ChargesAccrual().charges_accrual_key)
charges_summary = pd.json_normalize(dataframe[CHARGES_SUMMARY].explode())
charges_summary = check_empty_dataframe(charges_summary, ChargeSummary().charge_summary_keys)
loan_state = pd.json_normalize(dataframe[LOAN_STATES].explode())
loan_state = check_empty_dataframe(loan_state, LoanState().loan_state_keys)
charges_config = pd.json_normalize(dataframe['chargesConfig'].explode())
charges_config = check_empty_dataframe(charges_config, ChargesConfig().charges_config)
incidental_charges = pd.json_normalize(dataframe['incidentalCharges'].explode())
incidental_charges = check_empty_dataframe(incidental_charges, IncidentalCharges().incidental_charges)
charges_due = pd.json_normalize(
dataframe[CHARGES_DUE].explode()) if CHARGES_DUE in dataframe.columns else pd.DataFrame()
charges_due = check_empty_dataframe(charges_due, ChargesDue().charges_due_key)
repayment_collection = pd.json_normalize(dataframe['repaymentCollection'].explode())
repayment_collection = check_empty_dataframe(repayment_collection, Collection().collection_keys)
manual_allocations = pd.json_normalize(
dataframe['manualAllocations'].explode()) if 'manualAllocations' in dataframe.columns else pd.DataFrame()
manual_allocations = check_empty_dataframe(manual_allocations, RepaymentAllocation().manual_allocation_keys)
lender_collection = pd.json_normalize(dataframe[LENDER_COLLECTION].explode())
lender_collection = check_empty_dataframe(lender_collection, LenderCollection().lender_collection_keys)
transfer_data = pd.json_normalize(dataframe[TRANSFER_DATA].explode())
transfer_data = check_empty_dataframe(transfer_data, TransferData().transfer_data_keys)
collection_info = pd.json_normalize(dataframe[COLLECTION_INFO].explode())
collection_info = check_empty_dataframe(collection_info, LenderCollectionInfo().lender_collection_info_keys)
carry_info = pd.DataFrame(columns=CarryInfo().carry_info_keys)
moratorium_data = pd.json_normalize(
dataframe[MORATORIUMS].explode()) if MORATORIUMS in dataframe.columns else pd.DataFrame()
moratorium_data = check_empty_dataframe(moratorium_data, Moratorium().morat_keys)
if CARRY_INFO in dataframe.columns:
carry_info = pd.json_normalize(dataframe[CARRY_INFO].explode())
carry_info = check_empty_dataframe(carry_info, CarryInfo().carry_info_keys)
replay_calculation_factory = ReplayCalculationFactory(batch_id=batch_id, start_date=start_date,
end_date=end_date,
config_df=config_dataframe,
disbursement_df=disbursement_dataframe,
colending_df=colending_dataframe,
interest_rate_df=interest_rate_dataframe, rps_df=rps,
accrual_df=accrual_dataframe,
allocations_df=allocations_dataframe,
repayment_summary_df=repayment_summary,
charges_accrual_df=charges_accrual,
charges_summary_df=charges_summary,
loan_state_df=loan_state,
charges_config=charges_config,
incidental_charges=incidental_charges,
charges_due=charges_due,
repayment_collection=repayment_collection,
manual_allocations=manual_allocations,
ehs_client=ehsClient, replay_id=replay_id,
lender_collection=lender_collection,
transfer_data=transfer_data,
lender_collection_info=collection_info,
carry_info=carry_info,
morat_data=moratorium_data
)
replay_calculation_factory.trigger_replay(None, None)
logger.info("The replay is successful and pushed the response to EHS.")
except Exception as e:
logger.exception(traceback.format_exc())
if config_util.getProperty('REPLAY_BATCHING') is not None and "True".__eq__(
config_util.getProperty("REPLAY_BATCHING")):
replay_id = replay_ids
# channel.basic_nack(delivery_tag=method.delivery_tag, multiple=False, requeue=False)
#nack_message(channel, method.delivery_tag, ehs_client=ehsClient, replay_id=replay_id)
with open('sample3.json') as f:
inp = json.load(f)
trigger_replay(None, None, None, inp, None, None)
---------------------------------------
util/config_util.py
import time
from kazoo.exceptions import SessionExpiredError
from zookeeper_setup.zookeeper_client import ZookeeperClient
# from kazoo.client import DataWatch
#zClient = ZookeeperClient()
serviceName = "/services/omniservicingbusiness/"
properties = dict()
class ConfigUtil:
def load_full_tree(self, serviceName="", propertiesMap=None):
if propertiesMap is None:
propertiesMap = dict()
start = time.time()
# for child in zClient.zk.get_children(serviceName):
# propertiesMap[child] = self.getProperty(child)
print("time taken : ", str(start - time.time()))
def __init__(self):
self.load_full_tree(serviceName, properties)
def pathWatcher(self, event):
"""
@param event:
@return:
"""
keyName = str(event[2])
#keyName = keyName.replace(serviceName, "")
self.getProperty(keyName=keyName, reset=True)
def getProperty(self, keyName, reset=False):
zKeyName = serviceName + keyName
if keyName in properties and not reset:
return properties[keyName]
#try:
#if zClient.zk.exists(zKeyName):
# data, stat = zClient.zk.get(zKeyName, watch=self.pathWatcher)
# if data is not None:
# properties[keyName] = data.decode("utf-8")
# return properties[keyName]
#except SessionExpiredError:
# print("Zookeeper session expired. Trying to connect again!")
# zClient.zk.stop()
# zClient.zk.start()
# self.getProperty(keyName)
return None
Editor is loading...
Leave a Comment