Untitled
unknown
plain_text
a year ago
9.5 kB
4
Indexable
import logging
import os
from app.base.Constantes import PLVECTOR_NUM_DESKS, MAPPER_PREFIX, RUTA_MAPPER_PRE
from app.data_loader.MDXDataLoader import MDXDataLoader
import numpy as np
import pandas as pd
from arq.Constants import GENERAL_BLOCK
logger = logging.getLogger(__name__)
class PLVectorLoader(MDXDataLoader):
def __init__(self, *args):
super().__init__(*args)
self.query_results_list = []
self.as_of_date = None
self.mapper_df = None
self.letras_isin = None
self.mdx_parameters = None
def extract(self, mdx_parameters, split_by_desk=False, risk_classes=None, *args, **kwargs):
self.as_of_date = mdx_parameters['AsOfDate']
self.mdx_parameters = mdx_parameters
if risk_classes: # Only required for Taylor VaR
try:
mdx_parameters['risk_class1'] = risk_classes[0]
mdx_parameters['risk_class2'] = risk_classes[1]
except IndexError:
mdx_parameters['risk_class2'] = 'DummyRiskClass'
if split_by_desk:
desk_list = self.get_desk_list(mdx_parameters)
# desk_list = ['STM', 'Credit BS Madrid', 'SLB Rates'] # Uncomment this to do more relevant tests
for desk in desk_list:
mdx_parameters['desk'] = desk
super().extract(mdx_parameters)
if self.query_result.empty:
logger.warning('Query %s with parameters %s received zero results.', self.mdx_query_name,
self.parameter_map)
else:
self.query_results_list.append(self.query_result.reset_index())
self.query_result = pd.concat(self.query_results_list)
else:
super().extract(mdx_parameters)
def read_mapper_from_file(self, join_with_mapper, unit, keep_df=True):
"""
Parameters
----------
join_with_mapper = IR, CR
unit = MAD, SLB
"""
if unit == 'MAD':
books = {'BOA': 'BOADILLA', 'OFF': 'OFFSHORE'}
separator = '#'
elif unit == 'SLB':
books = {'SLB': 'SLB'}
separator = '#'
else:
raise NotImplementedError
mapper_list = []
for book, book_long in books.items():
filename = '{asset_class}_SHAT_{book_long}_GS_inst_{date}.csv'.format(asset_class=join_with_mapper,
book_long=book_long,
date=self.as_of_date.replace('-', ''))
mapper_file_path = os.path.join(self.catalog.get_property(GENERAL_BLOCK, MAPPER_PREFIX),
RUTA_MAPPER_PRE.format(data=self.as_of_date, p1=unit, p2=book),
filename)
mapper_list.append(pd.read_csv(mapper_file_path, delimiter=';', header=None))
mapper_df = pd.concat(mapper_list)
if join_with_mapper == 'IR':
mapper_df[10] = mapper_df[10].str.split(separator).str[-1]
elif join_with_mapper == 'CR':
mapper_df[10] = mapper_df[10].str.split(separator).str[-6]
mapper_df = mapper_df.rename(columns={0: "SecName", 1: "Leg", 10: "MUREX_instrument"})
mapper_df['SecName_UP'] = mapper_df['SecName'].str.upper()
mapper_df = mapper_df[['SecName_UP', 'Leg', 'MUREX_instrument']]
if unit == 'SLB':
mapper_df['MUREX_instrument'] = self.slb_isin_to_mxinstr(mapper_df)
if keep_df:
self.mapper_df = mapper_df
else:
return mapper_df
def slb_isin_to_mxinstr(self, mapper_df: pd.DataFrame):
"""
Mariner sends ISINs in the MX instrument split in SecName
This function finds ISINs and replaces them by the actual MX instrument
"""
query_params = {'AsOfDate': self.as_of_date}
isin_to_label = MDXDataLoader(self.catalog, 'SLB_ISIN_TO_SELABEL', self.source_cube)
isin_to_label.extract(query_params)
isin_to_label.transform()
isin_to_label.transformed_df = isin_to_label.transformed_df[isin_to_label.transformed_df['security_label'] != 'N/A']
isin_to_label.transformed_df = isin_to_label.transformed_df[
isin_to_label.transformed_df['security_code'] != 'N/A']
mapper_df_isin = mapper_df.copy()
mapper_df_isin = mapper_df_isin.merge(isin_to_label.transformed_df, how='left', left_on=['MUREX_instrument'],
right_on=['security_code'])
mapper_df_isin = mapper_df_isin[['security_label', 'MUREX_instrument']]
mapper_df_isin['Final'] = mapper_df_isin.bfill(axis=1).iloc[:, 0]
mapper_df_isin = mapper_df_isin['Final'].rename('MUREX_instrument')
return mapper_df_isin
def transform(self, join_with_mapper=None, unit=None, *args, **kwargs):
"""
Parameters
----------
join_with_mapper = IR, CR
unit = MAD, SLB
"""
self.transformed_df = self.query_result.reset_index().copy()
if len(self.transformed_df) == 0:
logger.warning('Attempting to transform dataframe of size 0. Skipping.')
return
self.transformed_df['OutputCurrency'] = 'EUR'
self.transformed_df["PnLVector.SUM"] = self.transformed_df["PnLVector.SUM"].apply(lambda x: np.array(x))
self.transformed_df = self.transformed_df.rename(columns={"PnLVector.SUM": "PnLVectorExpand"})
if 'PortfolioParent' in list(self.transformed_df.columns):
self.transformed_df = self.transformed_df.rename(columns={"PortfolioParent": "Portfolio"})
self.transformed_df.rename(columns={"PnLVector.SUM": "PnLVectorExpand"}, inplace=True)
to_string = ['SecName', "Portfolio"]
to_numeric = ['Leg']
for column in to_string:
self.transformed_df[column] = self.transformed_df[column].astype('string')
for column in to_numeric:
self.transformed_df[column] = self.transformed_df[column].astype('int')
if join_with_mapper: # Taylor - find MX instrument from mapper file
self.read_mapper_from_file(join_with_mapper, unit)
self.get_letras_list(self.mdx_parameters)
self.transformed_df['SecName_UP'] = self.transformed_df['SecName'].str.upper()
self.transformed_df = self.transformed_df.merge(self.mapper_df, how='left', on=['SecName_UP', 'Leg'])
logger.info('%d secnames matched with %s mapper',
len(self.transformed_df[self.transformed_df.MUREX_instrument.notna()]),
join_with_mapper)
self.transformed_df.drop(['SecName_UP'], axis=1)
# Feed flag to indicate if bond is a Letra del Tesoro (if any)
if self.letras_isin is not None:
self.transformed_df['isLetter'] = self.transformed_df['MUREX_instrument'].isin(self.letras_isin)
else: # Full Reval - retrieve directly from Underlying
self.transformed_df['MUREX_instrument'] = self.transformed_df['Underlying2'].astype(str)
def get_desk_list(self, mdx_parameters):
desk_query = MDXDataLoader(self.catalog, 'TRADING_DESK_SCOPE', self.source_cube)
desk_query.extract(mdx_parameters)
desks_df = desk_query.get_query_result()
desks_df = desks_df['TradingDesk'].unique()
try: # Get number of desks from properties, otherwise return all
max_num_desks = int(self.catalog.get_property(GENERAL_BLOCK, PLVECTOR_NUM_DESKS))
logger.warning('Limiting load to {} trading desks'.format(max_num_desks))
return desks_df[0:min(max_num_desks, len(desks_df))]
except KeyError:
return desks_df
def get_letras_list(self, scope):
# New req: add a flag to identify Letras del tesoro. These can be pinpointed by
# filtering Credit Delta secnames and finding those ISINs that have Risk Factors containing "LT-"
# This is quite brittle and probably subject to change if historical series change...
logger.info('Getting ISINs that are Letras del Tesoro. Scope: %s', scope)
# Get list of Secname+Leg that correspond to letras
letras_mdx = MDXDataLoader(self.catalog, 'LETRAS_SECNAMES', self.source_cube)
letras_mdx.extract(scope)
letras_mdx.transform()
# Retrieve Credit mapper
mapper_cr = self.read_mapper_from_file('CR', scope['Country'], keep_df=False)
# Join with mapper to find ISINs (i.e. MX instruments)
try:
letras_secnames = letras_mdx.transformed_df.copy()
letras_secnames['Leg'] = letras_secnames['Leg'].astype('int')
letras_secnames['SecName_UP'] = letras_secnames['SecName'].str.upper()
letras_secnames = letras_secnames.merge(mapper_cr, how='left', on=['SecName_UP', 'Leg'])
self.letras_isin = letras_secnames['MUREX_instrument']
self.letras_isin = self.letras_isin.drop_duplicates()
return self.letras_isin
except KeyError:
return NoneEditor is loading...
Leave a Comment