Untitled
unknown
plain_text
4 months ago
9.5 kB
2
Indexable
import logging import os from app.base.Constantes import PLVECTOR_NUM_DESKS, MAPPER_PREFIX, RUTA_MAPPER_PRE from app.data_loader.MDXDataLoader import MDXDataLoader import numpy as np import pandas as pd from arq.Constants import GENERAL_BLOCK logger = logging.getLogger(__name__) class PLVectorLoader(MDXDataLoader): def __init__(self, *args): super().__init__(*args) self.query_results_list = [] self.as_of_date = None self.mapper_df = None self.letras_isin = None self.mdx_parameters = None def extract(self, mdx_parameters, split_by_desk=False, risk_classes=None, *args, **kwargs): self.as_of_date = mdx_parameters['AsOfDate'] self.mdx_parameters = mdx_parameters if risk_classes: # Only required for Taylor VaR try: mdx_parameters['risk_class1'] = risk_classes[0] mdx_parameters['risk_class2'] = risk_classes[1] except IndexError: mdx_parameters['risk_class2'] = 'DummyRiskClass' if split_by_desk: desk_list = self.get_desk_list(mdx_parameters) # desk_list = ['STM', 'Credit BS Madrid', 'SLB Rates'] # Uncomment this to do more relevant tests for desk in desk_list: mdx_parameters['desk'] = desk super().extract(mdx_parameters) if self.query_result.empty: logger.warning('Query %s with parameters %s received zero results.', self.mdx_query_name, self.parameter_map) else: self.query_results_list.append(self.query_result.reset_index()) self.query_result = pd.concat(self.query_results_list) else: super().extract(mdx_parameters) def read_mapper_from_file(self, join_with_mapper, unit, keep_df=True): """ Parameters ---------- join_with_mapper = IR, CR unit = MAD, SLB """ if unit == 'MAD': books = {'BOA': 'BOADILLA', 'OFF': 'OFFSHORE'} separator = '#' elif unit == 'SLB': books = {'SLB': 'SLB'} separator = '#' else: raise NotImplementedError mapper_list = [] for book, book_long in books.items(): filename = '{asset_class}_SHAT_{book_long}_GS_inst_{date}.csv'.format(asset_class=join_with_mapper, book_long=book_long, date=self.as_of_date.replace('-', '')) mapper_file_path = os.path.join(self.catalog.get_property(GENERAL_BLOCK, MAPPER_PREFIX), RUTA_MAPPER_PRE.format(data=self.as_of_date, p1=unit, p2=book), filename) mapper_list.append(pd.read_csv(mapper_file_path, delimiter=';', header=None)) mapper_df = pd.concat(mapper_list) if join_with_mapper == 'IR': mapper_df[10] = mapper_df[10].str.split(separator).str[-1] elif join_with_mapper == 'CR': mapper_df[10] = mapper_df[10].str.split(separator).str[-6] mapper_df = mapper_df.rename(columns={0: "SecName", 1: "Leg", 10: "MUREX_instrument"}) mapper_df['SecName_UP'] = mapper_df['SecName'].str.upper() mapper_df = mapper_df[['SecName_UP', 'Leg', 'MUREX_instrument']] if unit == 'SLB': mapper_df['MUREX_instrument'] = self.slb_isin_to_mxinstr(mapper_df) if keep_df: self.mapper_df = mapper_df else: return mapper_df def slb_isin_to_mxinstr(self, mapper_df: pd.DataFrame): """ Mariner sends ISINs in the MX instrument split in SecName This function finds ISINs and replaces them by the actual MX instrument """ query_params = {'AsOfDate': self.as_of_date} isin_to_label = MDXDataLoader(self.catalog, 'SLB_ISIN_TO_SELABEL', self.source_cube) isin_to_label.extract(query_params) isin_to_label.transform() isin_to_label.transformed_df = isin_to_label.transformed_df[isin_to_label.transformed_df['security_label'] != 'N/A'] isin_to_label.transformed_df = isin_to_label.transformed_df[ isin_to_label.transformed_df['security_code'] != 'N/A'] mapper_df_isin = mapper_df.copy() mapper_df_isin = mapper_df_isin.merge(isin_to_label.transformed_df, how='left', left_on=['MUREX_instrument'], right_on=['security_code']) mapper_df_isin = mapper_df_isin[['security_label', 'MUREX_instrument']] mapper_df_isin['Final'] = mapper_df_isin.bfill(axis=1).iloc[:, 0] mapper_df_isin = mapper_df_isin['Final'].rename('MUREX_instrument') return mapper_df_isin def transform(self, join_with_mapper=None, unit=None, *args, **kwargs): """ Parameters ---------- join_with_mapper = IR, CR unit = MAD, SLB """ self.transformed_df = self.query_result.reset_index().copy() if len(self.transformed_df) == 0: logger.warning('Attempting to transform dataframe of size 0. Skipping.') return self.transformed_df['OutputCurrency'] = 'EUR' self.transformed_df["PnLVector.SUM"] = self.transformed_df["PnLVector.SUM"].apply(lambda x: np.array(x)) self.transformed_df = self.transformed_df.rename(columns={"PnLVector.SUM": "PnLVectorExpand"}) if 'PortfolioParent' in list(self.transformed_df.columns): self.transformed_df = self.transformed_df.rename(columns={"PortfolioParent": "Portfolio"}) self.transformed_df.rename(columns={"PnLVector.SUM": "PnLVectorExpand"}, inplace=True) to_string = ['SecName', "Portfolio"] to_numeric = ['Leg'] for column in to_string: self.transformed_df[column] = self.transformed_df[column].astype('string') for column in to_numeric: self.transformed_df[column] = self.transformed_df[column].astype('int') if join_with_mapper: # Taylor - find MX instrument from mapper file self.read_mapper_from_file(join_with_mapper, unit) self.get_letras_list(self.mdx_parameters) self.transformed_df['SecName_UP'] = self.transformed_df['SecName'].str.upper() self.transformed_df = self.transformed_df.merge(self.mapper_df, how='left', on=['SecName_UP', 'Leg']) logger.info('%d secnames matched with %s mapper', len(self.transformed_df[self.transformed_df.MUREX_instrument.notna()]), join_with_mapper) self.transformed_df.drop(['SecName_UP'], axis=1) # Feed flag to indicate if bond is a Letra del Tesoro (if any) if self.letras_isin is not None: self.transformed_df['isLetter'] = self.transformed_df['MUREX_instrument'].isin(self.letras_isin) else: # Full Reval - retrieve directly from Underlying self.transformed_df['MUREX_instrument'] = self.transformed_df['Underlying2'].astype(str) def get_desk_list(self, mdx_parameters): desk_query = MDXDataLoader(self.catalog, 'TRADING_DESK_SCOPE', self.source_cube) desk_query.extract(mdx_parameters) desks_df = desk_query.get_query_result() desks_df = desks_df['TradingDesk'].unique() try: # Get number of desks from properties, otherwise return all max_num_desks = int(self.catalog.get_property(GENERAL_BLOCK, PLVECTOR_NUM_DESKS)) logger.warning('Limiting load to {} trading desks'.format(max_num_desks)) return desks_df[0:min(max_num_desks, len(desks_df))] except KeyError: return desks_df def get_letras_list(self, scope): # New req: add a flag to identify Letras del tesoro. These can be pinpointed by # filtering Credit Delta secnames and finding those ISINs that have Risk Factors containing "LT-" # This is quite brittle and probably subject to change if historical series change... logger.info('Getting ISINs that are Letras del Tesoro. Scope: %s', scope) # Get list of Secname+Leg that correspond to letras letras_mdx = MDXDataLoader(self.catalog, 'LETRAS_SECNAMES', self.source_cube) letras_mdx.extract(scope) letras_mdx.transform() # Retrieve Credit mapper mapper_cr = self.read_mapper_from_file('CR', scope['Country'], keep_df=False) # Join with mapper to find ISINs (i.e. MX instruments) try: letras_secnames = letras_mdx.transformed_df.copy() letras_secnames['Leg'] = letras_secnames['Leg'].astype('int') letras_secnames['SecName_UP'] = letras_secnames['SecName'].str.upper() letras_secnames = letras_secnames.merge(mapper_cr, how='left', on=['SecName_UP', 'Leg']) self.letras_isin = letras_secnames['MUREX_instrument'] self.letras_isin = self.letras_isin.drop_duplicates() return self.letras_isin except KeyError: return None
Editor is loading...
Leave a Comment