Untitled

 avatar
unknown
plain_text
4 months ago
9.5 kB
2
Indexable
import logging
import os

from app.base.Constantes import PLVECTOR_NUM_DESKS, MAPPER_PREFIX, RUTA_MAPPER_PRE
from app.data_loader.MDXDataLoader import MDXDataLoader
import numpy as np
import pandas as pd

from arq.Constants import GENERAL_BLOCK

logger = logging.getLogger(__name__)


class PLVectorLoader(MDXDataLoader):
    def __init__(self, *args):
        super().__init__(*args)
        self.query_results_list = []
        self.as_of_date = None
        self.mapper_df = None
        self.letras_isin = None
        self.mdx_parameters = None

    def extract(self, mdx_parameters, split_by_desk=False, risk_classes=None, *args, **kwargs):
        self.as_of_date = mdx_parameters['AsOfDate']
        self.mdx_parameters = mdx_parameters
        if risk_classes:  # Only required for Taylor VaR
            try:
                mdx_parameters['risk_class1'] = risk_classes[0]
                mdx_parameters['risk_class2'] = risk_classes[1]
            except IndexError:
                mdx_parameters['risk_class2'] = 'DummyRiskClass'

        if split_by_desk:
            desk_list = self.get_desk_list(mdx_parameters)
            # desk_list = ['STM', 'Credit BS Madrid', 'SLB Rates'] # Uncomment this to do more relevant tests
            for desk in desk_list:
                mdx_parameters['desk'] = desk
                super().extract(mdx_parameters)
                if self.query_result.empty:
                    logger.warning('Query %s with parameters %s received zero results.', self.mdx_query_name,
                                   self.parameter_map)
                else:
                    self.query_results_list.append(self.query_result.reset_index())
            self.query_result = pd.concat(self.query_results_list)
        else:
            super().extract(mdx_parameters)

    def read_mapper_from_file(self, join_with_mapper, unit, keep_df=True):
        """
        Parameters
        ----------
        join_with_mapper = IR, CR
        unit = MAD, SLB
        """
        if unit == 'MAD':
            books = {'BOA': 'BOADILLA', 'OFF': 'OFFSHORE'}
            separator = '#'
        elif unit == 'SLB':
            books = {'SLB': 'SLB'}
            separator = '#'
        else:
            raise NotImplementedError
        mapper_list = []
        for book, book_long in books.items():
            filename = '{asset_class}_SHAT_{book_long}_GS_inst_{date}.csv'.format(asset_class=join_with_mapper,
                                                                                  book_long=book_long,
                                                                                  date=self.as_of_date.replace('-', ''))
            mapper_file_path = os.path.join(self.catalog.get_property(GENERAL_BLOCK, MAPPER_PREFIX),
                                            RUTA_MAPPER_PRE.format(data=self.as_of_date, p1=unit, p2=book),
                                            filename)
            mapper_list.append(pd.read_csv(mapper_file_path, delimiter=';', header=None))
        mapper_df = pd.concat(mapper_list)
        if join_with_mapper == 'IR':
            mapper_df[10] = mapper_df[10].str.split(separator).str[-1]
        elif join_with_mapper == 'CR':
            mapper_df[10] = mapper_df[10].str.split(separator).str[-6]
        mapper_df = mapper_df.rename(columns={0: "SecName", 1: "Leg", 10: "MUREX_instrument"})
        mapper_df['SecName_UP'] = mapper_df['SecName'].str.upper()
        mapper_df = mapper_df[['SecName_UP', 'Leg', 'MUREX_instrument']]
        if unit == 'SLB':
            mapper_df['MUREX_instrument'] = self.slb_isin_to_mxinstr(mapper_df)
        if keep_df:
            self.mapper_df = mapper_df
        else:
            return mapper_df

    def slb_isin_to_mxinstr(self, mapper_df: pd.DataFrame):
        """
        Mariner sends ISINs in the MX instrument split in SecName
        This function finds ISINs and replaces them by the actual MX instrument
        """
        query_params = {'AsOfDate': self.as_of_date}
        isin_to_label = MDXDataLoader(self.catalog, 'SLB_ISIN_TO_SELABEL', self.source_cube)
        isin_to_label.extract(query_params)
        isin_to_label.transform()
        isin_to_label.transformed_df = isin_to_label.transformed_df[isin_to_label.transformed_df['security_label'] != 'N/A']
        isin_to_label.transformed_df = isin_to_label.transformed_df[
            isin_to_label.transformed_df['security_code'] != 'N/A']
        mapper_df_isin = mapper_df.copy()
        mapper_df_isin = mapper_df_isin.merge(isin_to_label.transformed_df, how='left', left_on=['MUREX_instrument'],
                                              right_on=['security_code'])
        mapper_df_isin = mapper_df_isin[['security_label', 'MUREX_instrument']]
        mapper_df_isin['Final'] = mapper_df_isin.bfill(axis=1).iloc[:, 0]
        mapper_df_isin = mapper_df_isin['Final'].rename('MUREX_instrument')
        return mapper_df_isin

    def transform(self, join_with_mapper=None, unit=None, *args, **kwargs):
        """
        Parameters
        ----------
        join_with_mapper = IR, CR
        unit = MAD, SLB
        """
        self.transformed_df = self.query_result.reset_index().copy()
        if len(self.transformed_df) == 0:
            logger.warning('Attempting to transform dataframe of size 0. Skipping.')
            return
        self.transformed_df['OutputCurrency'] = 'EUR'
        self.transformed_df["PnLVector.SUM"] = self.transformed_df["PnLVector.SUM"].apply(lambda x: np.array(x))
        self.transformed_df = self.transformed_df.rename(columns={"PnLVector.SUM": "PnLVectorExpand"})
        if 'PortfolioParent' in list(self.transformed_df.columns):
            self.transformed_df = self.transformed_df.rename(columns={"PortfolioParent": "Portfolio"})
        self.transformed_df.rename(columns={"PnLVector.SUM": "PnLVectorExpand"}, inplace=True)
        to_string = ['SecName', "Portfolio"]
        to_numeric = ['Leg']
        for column in to_string:
            self.transformed_df[column] = self.transformed_df[column].astype('string')
        for column in to_numeric:
            self.transformed_df[column] = self.transformed_df[column].astype('int')

        if join_with_mapper:  # Taylor - find MX instrument from mapper file
            self.read_mapper_from_file(join_with_mapper, unit)
            self.get_letras_list(self.mdx_parameters)
            self.transformed_df['SecName_UP'] = self.transformed_df['SecName'].str.upper()
            self.transformed_df = self.transformed_df.merge(self.mapper_df, how='left', on=['SecName_UP', 'Leg'])
            logger.info('%d secnames matched with %s mapper',
                        len(self.transformed_df[self.transformed_df.MUREX_instrument.notna()]),
                        join_with_mapper)
            self.transformed_df.drop(['SecName_UP'], axis=1)

            # Feed flag to indicate if bond is a Letra del Tesoro (if any)
            if self.letras_isin is not None:
                self.transformed_df['isLetter'] = self.transformed_df['MUREX_instrument'].isin(self.letras_isin)

        else:  # Full Reval - retrieve directly from Underlying
            self.transformed_df['MUREX_instrument'] = self.transformed_df['Underlying2'].astype(str)

    def get_desk_list(self, mdx_parameters):
        desk_query = MDXDataLoader(self.catalog, 'TRADING_DESK_SCOPE', self.source_cube)
        desk_query.extract(mdx_parameters)
        desks_df = desk_query.get_query_result()
        desks_df = desks_df['TradingDesk'].unique()

        try:  # Get number of desks from properties, otherwise return all
            max_num_desks = int(self.catalog.get_property(GENERAL_BLOCK, PLVECTOR_NUM_DESKS))
            logger.warning('Limiting load to {} trading desks'.format(max_num_desks))
            return desks_df[0:min(max_num_desks, len(desks_df))]
        except KeyError:
            return desks_df

    def get_letras_list(self, scope):
        # New req: add a flag to identify Letras del tesoro. These can be pinpointed by
        # filtering Credit Delta secnames and finding those ISINs that have Risk Factors containing "LT-"
        # This is quite brittle and probably subject to change if historical series change...

        logger.info('Getting ISINs that are Letras del Tesoro. Scope: %s', scope)

        # Get list of Secname+Leg that correspond to letras
        letras_mdx = MDXDataLoader(self.catalog, 'LETRAS_SECNAMES', self.source_cube)
        letras_mdx.extract(scope)
        letras_mdx.transform()
        
        # Retrieve Credit mapper
        mapper_cr = self.read_mapper_from_file('CR', scope['Country'], keep_df=False)

        # Join with mapper to find ISINs (i.e. MX instruments)
        try:
            letras_secnames = letras_mdx.transformed_df.copy()
            letras_secnames['Leg'] = letras_secnames['Leg'].astype('int')
            letras_secnames['SecName_UP'] = letras_secnames['SecName'].str.upper()
            letras_secnames = letras_secnames.merge(mapper_cr, how='left', on=['SecName_UP', 'Leg'])
            self.letras_isin = letras_secnames['MUREX_instrument']
            self.letras_isin = self.letras_isin.drop_duplicates()

            return self.letras_isin
        except KeyError:
            return None
Editor is loading...
Leave a Comment