Untitled
unknown
plain_text
2 months ago
15 kB
3
Indexable
from arq.CubeConfigurer import CubeConfigurer from app.base.Constantes import * from app.GerStructure import create_ger_hierarchies import atoti as tt import logging logger = logging.getLogger(__name__) def var99_measure(vector): return tt.array.quantile(vector, 1 - 0.99, mode='exc', interpolation='linear') def vae99_measure(vector): return tt.array.quantile(vector, 0.99, mode='exc', interpolation='linear') def correlation(x, y): numer = tt.array.sum((x - tt.array.mean(x)) * (y - tt.array.mean(y))) x1 = tt.array.sum((x - tt.array.mean(x)) * (x - tt.array.mean(x))) y1 = tt.array.sum((y - tt.array.mean(y)) * (y - tt.array.mean(y))) denom = tt.math.sqrt(x1 * y1) return numer / denom class Cube1(CubeConfigurer): scenarios = [] def __init__(self): super().__init__("plsec_store", "auto") def create_hierarchies(self): # Define slicing hierarchies logger.info('Configuring slicing hierarchies') self.h['AsOfDate'].slicing = True self.l['AsOfDate'].order = tt.NaturalOrder(ascending=False) # Scenario set should be slicing but SLB and MAD use the same calendar, so not needed # self.h['Scenario Set'].slicing = True logger.info('Available hierarchies: %s', list(self.h)) create_ger_hierarchies(self) def create_meassures(self): for measure in BOND_PRICER_STORE_MEASURES: self.m[measure] = tt.agg.single_value(self.data_store.get_table(BOND_PRICER_STORE)[measure]) self.m[measure].folder = MEASURE_FOLDER_BOND_PRICER # Sensitivities scope_sensi = tt.OriginScope(self.l['MUREX_instrument'], self.l['Portfolio']) sensitivity_list = ["CR Delta", "IR Delta", "INF Delta", "FX Delta"] for sensi in sensitivity_list: self.m['_' + sensi] = tt.agg.single_value(self.data_store.get_table(SENSITIVITIES_STORE)[sensi]) self.m['_' + sensi].visible = False self.m[sensi] = tt.agg.sum(self.m['_' + sensi], scope=scope_sensi) self.m[sensi].folder = MEASURE_FOLDER_SENSITIVITIES # Absolute sensitivities - compute abs(sensi) on Instrument level and aggregate from there for sensi in sensitivity_list: self.m['Abs. ' + sensi] = tt.agg.sum(tt.math.abs(self.m[sensi]), scope=tt.OriginScope(self.l['MUREX_instrument'])) self.m['Abs. ' + sensi].folder = MEASURE_FOLDER_SENSITIVITIES_ABS # Nominal scope_nominals = tt.OriginScope(self.l['MUREX_instrument'], self.l['Portfolio'], self.l['NominalUnit']) self.m['_Nominal'] = tt.agg.sum( self.data_store.get_table(NOMINALES_STORE)["Net Nominal Native"]) self.m['_Nominal'].folder = MEASURE_FOLDER_TECHNICAL nominal = self.m['_Nominal'] # / self.m['contributors.COUNT'] self.m['Nominal (Native)'] = tt.agg.sum(nominal, scope=scope_nominals) # FX Rate conversion self.m['_RateToUSD'] = tt.agg.single_value(self.data_store.get_table(FXRATES_STORE)['FxRate']) self.m['_RateToUSD'].visible = False # Get rate for any currency to USD. If USD-USD, then it is 1. self.m['RateToUSD'] = tt.where(self.m['_RateToUSD'] > 0, self.m['_RateToUSD'], 1) # Get EUR-USD directly from FXRates table self.m['RateEURUSD'] = tt.lookup(self.data_store.get_table(FXRATES_STORE)['FxRate'], (self.data_store.get_table(FXRATES_STORE)['AsOfDate'] == self.l['AsOfDate']) & (self.data_store.get_table(FXRATES_STORE)['MarketDataSet'] == 'ES') & (self.data_store.get_table(FXRATES_STORE)['CounterCcy'] == 'EUR') & (self.data_store.get_table(FXRATES_STORE)['BaseCcy'] == 'USD')) self.m['RateToUSD'].folder = MEASURE_FOLDER_NOMINAL self.m['RateEURUSD'].folder = MEASURE_FOLDER_NOMINAL # Convert nominals nominal_usd = nominal * self.m['RateToUSD'] nominal_eur = nominal_usd * self.m['RateEURUSD'] self.m['Nominal (USD)'] = tt.agg.sum(nominal_usd, scope=scope_nominals) self.m['Nominal (EUR)'] = tt.agg.sum(nominal_eur, scope=scope_nominals) self.m['Nominal (Native)'].folder = MEASURE_FOLDER_NOMINAL self.m['Nominal (USD)'].folder = MEASURE_FOLDER_NOMINAL self.m['Nominal (EUR)'].folder = MEASURE_FOLDER_NOMINAL # Absolute nominals ## cuánto nominal hay en este vector de Z spread, cuánto dejo fuera (vector z-spread no definido) ## ratio = nom fuera / nom total scope_abs_nominals = tt.OriginScope(self.l['MUREX_instrument'], self.l['NominalUnit']) self.m['Absolute Nominal (Native)'] = tt.agg.sum(tt.math.abs(nominal), scope=scope_abs_nominals) self.m['Absolute Nominal (USD)'] = tt.agg.sum(tt.math.abs(nominal_usd), scope=scope_abs_nominals) self.m['Absolute Nominal (EUR)'] = tt.agg.sum(tt.math.abs(nominal_eur), scope=scope_abs_nominals) self.m['Absolute Nominal (Native)'].folder = MEASURE_FOLDER_NOMINAL_ABS self.m['Absolute Nominal (USD)'].folder = MEASURE_FOLDER_NOMINAL_ABS self.m['Absolute Nominal (EUR)'].folder = MEASURE_FOLDER_NOMINAL_ABS self.m["pltot_vector.VALUE"] = tt.agg.single_value(self.data_store.get_table(PLTOT_STORE)["PltotVectorExpand"]) self.m["pltot_vector.VALUE"].visible = False self.m["PLTOTVectorExpand.SUM"] = self.total_scn(tt.agg.sum(self.m["pltot_vector.VALUE"], scope=tt.OriginScope(self.l['Portfolio'], self.l['Container']))) self.m["PLTOTVectorExpand.SUM"].folder = MEASURE_FOLDER_TECHNICAL self.m["PnLVectorExpand.SUM"].folder = MEASURE_FOLDER_TECHNICAL self.m["PnLVectorExpand.MEAN"].visible = False self.m["VaR 99% PROD"] = var99_measure(self.m["PnLVectorExpand.SUM"]) self.m["VaR 99% PROD"].folder = MEASURE_FOLDER_PLVECTOR_PROD self.m["VaR 99% PLTOT"] = var99_measure(self.m["PLTOTVectorExpand.SUM"]) self.m["VaR 99% PLTOT"].folder = MEASURE_FOLDER_PLVECTOR_PRODTOT self.m['date_index'] = tt.agg.single_value(self.data_store.get_table(SCENARIOS_STORE)['index']) self.m['date_index'].visible = False self.m["PLTOT.PnLVectorExpand"] = self.m["PLTOTVectorExpand.SUM"][self.m["date_index"]] self.m["PLTOT.PnLVectorExpand"].folder = MEASURE_FOLDER_PLVECTOR_PRODTOT self.m['PROD.PnLVectorExpand'] = self.total_scn(self.m['PnLVectorExpand.SUM'])[self.m["date_index"]] self.m['PROD.PnLVectorExpand'].folder = MEASURE_FOLDER_PLVECTOR_PROD self.m['Yield.VALUE'] = tt.agg.single_value(self.data_store.get_table(YIELD_STORE)["YIELD"]) self.m['YieldReturns.VALUE'] = tt.agg.single_value(self.data_store.get_table(YIELD_STORE)["YieldReturns"]) self.m['Base Bond'] = tt.agg.single_value(self.data_store.get_table(YIELD_STORE)["BaseBond"]) self.m['Yield.VALUE'].folder = MEASURE_FOLDER_TECHNICAL self.m['YieldReturns.VALUE'].folder = MEASURE_FOLDER_TECHNICAL # PL vector by yield self.m['YieldPnLVectorExpand.SUM'] = self.total_scn(tt.agg.sum(- self.m["YieldReturns.VALUE"] * self.m['Mod Duration'] * self.m['Nominal (EUR)'] * self.m['Dirty Price'] / self.m['Base Bond'], scope=tt.OriginScope( self.l['MUREX_instrument']))) self.m['YieldPnLVectorExpand.SUM'].folder = MEASURE_FOLDER_TECHNICAL self.m["YieldVectorExpand"] = self.m["Yield.VALUE"][self.m["date_index"]] self.m["YieldReturnsVectorExpand"] = self.m["YieldReturns.VALUE"][self.m["date_index"]] self.m['Min. Yield'] = tt.array.min(self.m["Yield.VALUE"]) self.m['Min. Yield'].folder = MEASURE_FOLDER_YIELD self.m['Min. Yield'].formatter = "DOUBLE[0.0000%]" self.m['Min. Return'] = tt.array.min(self.m["YieldReturns.VALUE"]) self.m['Min. Return'].folder = MEASURE_FOLDER_YIELD self.m['Min. Return'].formatter = "DOUBLE[0.0000%]" self.m["YieldVectorExpand"].folder = MEASURE_FOLDER_YIELD self.m["YieldVectorExpand"].formatter = "DOUBLE[0.0000%]" self.m["YieldReturnsVectorExpand"].folder = MEASURE_FOLDER_YIELD self.m["YieldReturnsVectorExpand"].formatter = "DOUBLE[0.0000%]" self.m['Yield.PnLVectorExpand'] = self.m['YieldPnLVectorExpand.SUM'][self.m["date_index"]] self.m['Yield.PnLVectorExpand'].folder = MEASURE_FOLDER_PLVECTOR_YIELD self.m['VaR 99% Yield'] = var99_measure(self.m['YieldPnLVectorExpand.SUM']) self.m['VaR 99% Yield'].folder = MEASURE_FOLDER_PLVECTOR_YIELD # Correlation between PROD and YIELD self.m['Correlation (Prod; Yield)'] = correlation(self.m['PnLVectorExpand.SUM'], self.m['YieldPnLVectorExpand.SUM']) self.m['Correlation (Prod; Yield)'].folder = 'Correlation' # Z-spread measures: PL yield - PLSEC, only if both are defined self.m['_ZSpreadPnLVectorExpand'] = tt.where(((~self.m["YieldPnLVectorExpand.SUM"].isnull()) & (~self.m["PnLVectorExpand.SUM"].isnull())), self.m["YieldPnLVectorExpand.SUM"] - self.m["PnLVectorExpand.SUM"]) self.m['ZSpreadPnLVectorExpand.SUM'] = self.total_scn(tt.agg.sum(self.m['_ZSpreadPnLVectorExpand'], scope=tt.OriginScope( self.l['MUREX_instrument']))) self.m['ZSpreadNormPnLVectorExpand.SUM'] = self.total_scn( tt.agg.sum(tt.where((~self.m['CR Delta'].isnull()), self.m['_ZSpreadPnLVectorExpand'] / (self.m['CR Delta'] + 0.01)), scope=tt.OriginScope( self.l['MUREX_instrument']))) self.m['ZSpread.PnLVectorExpand'] = self.m['ZSpreadPnLVectorExpand.SUM'][self.m["date_index"]] self.m['ZSpreadNorm.PnLVectorExpand'] = self.m['ZSpreadNormPnLVectorExpand.SUM'][self.m["date_index"]] self.m['VaR 99% ZSpread'] = var99_measure(self.m['ZSpreadPnLVectorExpand.SUM']) self.m['VaR 99% ZSpread Norm'] = var99_measure(self.m['ZSpreadNormPnLVectorExpand.SUM']) self.m['VaE 99% ZSpread Norm'] = vae99_measure(self.m['ZSpreadNormPnLVectorExpand.SUM']) self.m['Worst Tail ZSpread Norm'] = tt.math.max(tt.math.abs(self.m['VaE 99% ZSpread Norm']), tt.math.abs(self.m['VaR 99% ZSpread Norm'])) self.m['_ZSpreadPnLVectorExpand'].folder = MEASURE_FOLDER_TECHNICAL self.m['ZSpreadPnLVectorExpand.SUM'].folder = MEASURE_FOLDER_TECHNICAL self.m['ZSpreadNormPnLVectorExpand.SUM'].folder = MEASURE_FOLDER_TECHNICAL self.m['ZSpread.PnLVectorExpand'].folder = MEASURE_FOLDER_PLVECTOR_ZSPREAD self.m['ZSpreadNorm.PnLVectorExpand'].folder = MEASURE_FOLDER_PLVECTOR_ZSPREAD self.m['VaR 99% ZSpread'].folder = MEASURE_FOLDER_PLVECTOR_ZSPREAD self.m['VaR 99% ZSpread Norm'].folder = MEASURE_FOLDER_PLVECTOR_ZSPREAD self.m['VaE 99% ZSpread Norm'].folder = MEASURE_FOLDER_PLVECTOR_ZSPREAD self.m['Worst Tail ZSpread Norm'].folder = MEASURE_FOLDER_PLVECTOR_ZSPREAD # Incremental measures: PLTOT + Z-spread self.m['IncrementalPnLVectorExpand.SUM'] = self.m['ZSpreadPnLVectorExpand.SUM'] + self.m[ "PLTOTVectorExpand.SUM"] self.m['Incremental.PnLVectorExpand'] = self.m['IncrementalPnLVectorExpand.SUM'][self.m["date_index"]] self.m['VaR 99% Incremental'] = var99_measure(self.m['IncrementalPnLVectorExpand.SUM']) self.m['IncrementalPnLVectorExpand.SUM'].folder = MEASURE_FOLDER_TECHNICAL self.m['Incremental.PnLVectorExpand'].folder = MEASURE_FOLDER_INCREMENTAL self.m['VaR 99% Incremental'].folder = MEASURE_FOLDER_INCREMENTAL # Reescalado de sensibilidades self.m['Abs. Nominal IN (EUR)'] = tt.agg.sum(tt.where((~self.m['_ZSpreadPnLVectorExpand'].isnull()), self.m['Absolute Nominal (EUR)'], 0), scope=tt.OriginScope(self.l['MUREX_instrument'])) self.m['Abs. Nominal OUT (EUR)'] = tt.agg.sum(tt.where((self.m['_ZSpreadPnLVectorExpand'].isnull()), self.m['Absolute Nominal (EUR)'], 0), scope=tt.OriginScope(self.l['MUREX_instrument'])) self.m['Abs. Nominal TOTAL (EUR)'] = tt.total(self.m['Absolute Nominal (EUR)'], self.h['SCIB BOA&SLB Level 1']) self.m['Abs. CR Delta IN (EUR)'] = tt.agg.sum(tt.where((~self.m['_ZSpreadPnLVectorExpand'].isnull()), self.m['Abs. CR Delta'], 0), scope=tt.OriginScope(self.l['MUREX_instrument'])) self.m['Abs. CR Delta OUT (EUR)'] = tt.agg.sum(tt.where((self.m['_ZSpreadPnLVectorExpand'].isnull()), self.m['Abs. CR Delta'], 0), scope=tt.OriginScope(self.l['MUREX_instrument'])) self.m['Abs. CR Delta TOTAL (EUR)'] = tt.total(self.m['Abs. CR Delta'], self.h['SCIB BOA&SLB Level 1']) # self.m['Nominal ratio'] = self.m['Abs. Nominal OUT (EUR)'] / self.m['Abs. Nominal TOTAL (EUR)'] # self.m['Scaled sensitivity'] = self.m['Nominal ratio'] self.m['Abs. Nominal IN (EUR)'].folder = MEASURE_FOLDER_SCALING self.m['Abs. Nominal OUT (EUR)'].folder = MEASURE_FOLDER_SCALING self.m['Abs. Nominal TOTAL (EUR)'].folder = MEASURE_FOLDER_SCALING self.m['Abs. CR Delta IN (EUR)'].folder = MEASURE_FOLDER_SCALING self.m['Abs. CR Delta OUT (EUR)'].folder = MEASURE_FOLDER_SCALING self.m['Abs. CR Delta TOTAL (EUR)'].folder = MEASURE_FOLDER_SCALING # self.m['Nominal ratio'].folder = MEASURE_FOLDER_SCALING # self.m['Scaled sensitivity'].folder = MEASURE_FOLDER_SCALING # RNIV Z-Spread = VaR PLTOT - VaR Incr self.m['RNIV Z-Spread'] = self.m["VaR 99% PLTOT"] - self.m['VaR 99% Incremental'] logger.info('Available measures: %s', list(self.m)) # PLVector FR def total_scn(self, vector): return tt.total(vector, self.h["Scenario"])
Editor is loading...
Leave a Comment