Untitled
unknown
plain_text
12 days ago
25 kB
2
Indexable
import pandas as pd import numpy as np from datetime import datetime from dateutil.relativedelta import relativedelta from scipy.stats import zscore ############################################### # 1. Data Loading (Filtering Out Weekends) ############################################### def load_price_data(filepath): """ Load historical prices from an Excel file. Assumes that the first column is dates and the remaining columns are tickers. Removes weekend data. """ df = pd.read_excel(filepath, sheet_name="Sheet2", index_col=0) df.index = pd.to_datetime(df.index) df = df.sort_index() # Filter out weekends (Saturday=5, Sunday=6) df = df[df.index.dayofweek < 5] return df def load_macro_data(filepath): """ Load macro indicators from an Excel file. Loads VIX and VIX3M from 'Eq2' sheet, LF98TRUU from 'FI2' sheet. Removes weekend data. Also computes slopes for selected macro indicators. """ # VIX data vix_data = pd.read_excel(filepath, sheet_name='Eq2', index_col=0, parse_dates=True, usecols=[0, 4, 5, 6, 7, 8]) vix_data.columns = ['VIX', 'VIX3M', 'UX1', 'UX2', 'UX3'] vix_data = vix_data[vix_data.index.dayofweek < 5] cdx_data = pd.read_excel(filepath, sheet_name='FI2', index_col=0, parse_dates=True, usecols=[0, 2]) cdx_data.columns = ['LF98TRUU'] cdx_data = cdx_data[cdx_data.index.dayofweek < 5] combined_data = pd.concat([vix_data, cdx_data], axis=1) combined_data = combined_data.fillna(method='ffill').fillna(method='bfill') combined_data = combined_data.sort_index() return combined_data ############################################### # 2. Helper: Observation Dates (Monthly) ############################################### def get_observation_dates(prices, start_date, end_date, rebalance_period): dates = [] current_date = start_date while current_date < end_date: candidate_date = (current_date + relativedelta(months=rebalance_period)).replace(day=1) while candidate_date not in prices.index: candidate_date += pd.Timedelta(days=1) if candidate_date.month != (current_date + relativedelta(months=rebalance_period)).month: candidate_date = None break if candidate_date is None or candidate_date > end_date: break dates.append(candidate_date) current_date = candidate_date return dates ############################################### # 3. Portfolio Initialization ############################################### def initialize_portfolio(prices, date, tickers, initial_aum): portfolio = {} allocation = initial_aum / len(tickers) for ticker in tickers: price = prices.loc[date, ticker] portfolio[ticker] = allocation / price return portfolio ############################################### # 4. Lookback Metric Computation ############################################### def compute_lookback_metric(prices, current_date, ticker, lookback_period, metric_type='simple'): prices = prices.sort_index() mask = prices.index < current_date prev_date = prices.index[mask][-1] lookback_date = prev_date - relativedelta(months=lookback_period) current_price = prices[ticker].asof(prev_date) lookback_price = prices[ticker].asof(lookback_date) if pd.isna(current_price) or pd.isna(lookback_price): raise ValueError(f"Missing price data for {ticker} on {prev_date} or {lookback_date}.") if metric_type == 'simple': metric = (current_price / lookback_price) - 1 elif metric_type == 'sma': window = prices[ticker].loc[lookback_date:current_date] if window.empty: raise ValueError(f"No price data for {ticker} between {lookback_date} and {current_date}.") sma = window.mean() metric = (current_price - sma) / sma else: raise ValueError("Invalid metric type. Choose 'simple' or 'sma'.") return metric ############################################### # 5. Ranking Assets by Momentum ############################################### def rank_assets(prices, current_date, tickers, lookback_period, metric_type): metrics = {} for ticker in tickers: metric = compute_lookback_metric(prices, current_date, ticker, lookback_period, metric_type) metrics[ticker] = metric sorted_tickers = sorted(metrics, key=metrics.get, reverse=True) ranks = {ticker: rank+1 for rank, ticker in enumerate(sorted_tickers)} return sorted_tickers, ranks, metrics ############################################### # 6. Compute Current Portfolio Value ############################################### def compute_portfolio_value(portfolio, prices, current_date): value = 0 for ticker, quantity in portfolio.items(): price = prices.loc[current_date, ticker] value += quantity * price return value ############################################### # 7. Rebalance the Momentum Portfolio ############################################### def rebalance_portfolio(portfolio, prices, current_date, tickers, sorted_tickers, internal_rebalance_ratios, rebalance_ratio): mask = prices.index < current_date prev_date = prices.index[mask][-1] prev_prices = prices.loc[prev_date] curr_prices = prices.loc[current_date] portfolio_value = sum(portfolio[ticker] * prev_prices[ticker] for ticker in tickers) rebalance_amount = portfolio_value * rebalance_ratio target_trades = {ticker: rebalance_amount * internal_rebalance_ratios[i] for i, ticker in enumerate(sorted_tickers)} total_sold = 0 actual_trades = {} for ticker, target_trade in target_trades.items(): if target_trade < 0: available_notional = portfolio[ticker] * curr_prices[ticker] sell_target = abs(target_trade) actual_sell = min(available_notional, sell_target) actual_trades[ticker] = -actual_sell total_sold += actual_sell else: actual_trades[ticker] = 0 total_buy_target = sum(t for t in target_trades.values() if t > 0) if total_buy_target > 0: for ticker, target_trade in target_trades.items(): if target_trade > 0: proportion = target_trade / total_buy_target buy_amount = total_sold * proportion actual_trades[ticker] = buy_amount new_portfolio = portfolio.copy() for ticker, trade_notional in actual_trades.items(): execution_price = curr_prices[ticker] qty_change = trade_notional / execution_price new_portfolio[ticker] += qty_change return new_portfolio, actual_trades, portfolio_value def adjust_overweight(portfolio, prices, current_date, sorted_tickers, threshold=0.70): mask = prices.index < current_date prev_date = prices.index[mask][-1] prev_prices = prices.loc[prev_date] curr_prices = prices.loc[current_date] portfolio_value = sum(portfolio[ticker] * prev_prices[ticker] for ticker in portfolio) weights = {ticker: (portfolio[ticker] * prev_prices[ticker]) / portfolio_value for ticker in portfolio} new_portfolio = portfolio.copy() for overweight in portfolio: if weights[overweight] > threshold: extra_weight = weights[overweight] - threshold extra_value = extra_weight * portfolio_value execution_price_over = curr_prices[overweight] qty_reduce = extra_value / execution_price_over new_portfolio[overweight] -= qty_reduce remaining_value = extra_value for candidate in sorted_tickers: if candidate == overweight: continue candidate_value = new_portfolio[candidate] * curr_prices[candidate] candidate_weight = candidate_value / portfolio_value if candidate_weight < threshold: capacity = (threshold - candidate_weight) * portfolio_value allocation = min(remaining_value, capacity) qty_add = allocation / curr_prices[candidate] new_portfolio[candidate] += qty_add remaining_value -= allocation if remaining_value <= 0: break return new_portfolio ############################################### # 8. Risk Signal Function ############################################### def generate_risk_signals(current_date, macro_data, prices, portfolio): # Get the most recent macro data before current_date available_macro_dates = macro_data.index[macro_data.index < current_date] ref_date = available_macro_dates[-1] if len(available_macro_dates) > 0 else current_date # Retrieve key metrics (simplified for this example) ux1_ux2_spread = macro_data['UX1'].asof(ref_date) - macro_data['UX2'].asof(ref_date) slope_5d = macro_data['UX1'].asof(ref_date) # placeholder slope_10d = macro_data['UX1'].asof(ref_date) # placeholder slope_15d = macro_data['UX1'].asof(ref_date) # placeholder vix_mom_signal = macro_data['UX3'].asof(ref_date) # placeholder # Determine signal based on momentum signal vix_signal = 'risk-off' if vix_mom_signal > 0 else 'risk-on' # Set target allocation based on signal (example logic) if vix_mom_signal == 3: vix_target_alloc = 0.6 elif vix_mom_signal == 1: vix_target_alloc = 0.8 else: vix_target_alloc = 1.0 # Get weights for SPY and HYG using previous day's portfolio mask = prices.index < current_date prev_date = prices.index[mask][-1] mom_value = compute_portfolio_value(portfolio, prices, prev_date) spy_weight = (portfolio.get('SPY US Equity', 0) * prices.loc[prev_date, 'SPY US Equity']) / mom_value if mom_value > 0 else 0 hyg_weight = (portfolio.get('HYG US Equity', 0) * prices.loc[prev_date, 'HYG US Equity']) / mom_value if mom_value > 0 else 0 note = "" # Override logic based on SPY+HYG weights if vix_signal == 'risk-off': if (spy_weight + hyg_weight) < 0.40: regime = 'risk-on' target_alloc = 1 note = "Forced regime to risk-on & target alloc 100% due to SPY+HYG < 40%" else: regime = 'risk-off' target_alloc = vix_target_alloc else: regime = 'risk-on' target_alloc = 1.0 vix_params = { 'UX1_UX2_Spread': ux1_ux2_spread, 'slope_5d': slope_5d, 'slope_10d': slope_10d, 'slope_15d': slope_15d, 'vix_mom_signal': vix_mom_signal, 'vix_target_alloc': vix_target_alloc, 'spy_weight': spy_weight, 'hyg_weight': hyg_weight } return regime, target_alloc, vix_signal, note, vix_params ############################################### # 9. Helper Functions for Cash ############################################### def invest_cash_into_portfolio(portfolio, prices, current_date, cash_qty, cash_ticker): cash_price = prices.loc[current_date, cash_ticker] cash_available = cash_qty * cash_price if cash_available <= 0: return portfolio, cash_qty, f"No {cash_ticker} to reinvest." mask = prices.index < current_date prev_date = prices.index[mask][-1] mom_value = compute_portfolio_value(portfolio, prices, current_date) new_portfolio = portfolio.copy() for ticker in portfolio: prev_price = prices.loc[prev_date, ticker] total_qty = (portfolio[ticker] * (mom_value + cash_available)) / mom_value if mom_value > 0 else 1/len(portfolio) new_portfolio[ticker] = total_qty return new_portfolio, 0.0, f"Reinvested {cash_available:,.2f} from {cash_ticker}" def allocate_cash_from_portfolio(portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker): # Use previous day's snapshot to compute total AUM mask = prices.index < current_date prev_date = prices.index[mask][-1] curr_value = compute_portfolio_value(portfolio, prices, prev_date) cash_price_prev = prices.loc[prev_date, cash_ticker] cash_equiv_prev = cash_qty * cash_price_prev total_aum = curr_value + cash_equiv_prev # Determine desired cash based on target allocation desired_cash = (1 - target_alloc) * total_aum cash_price = prices.loc[current_date, cash_ticker] current_cash = cash_qty * cash_price new_portfolio = portfolio.copy() new_cash_qty = cash_qty note = "" # If excess cash exists, scale up positions to reach target allocation if desired_cash < current_cash: current_portfolio_value = sum(portfolio[ticker] * prices.loc[current_date, ticker] for ticker in portfolio) desired_risk_allocation = total_aum * target_alloc scaling_factor = desired_risk_allocation / current_portfolio_value for ticker in portfolio: new_portfolio[ticker] = portfolio[ticker] * scaling_factor excess_cash = current_cash - desired_cash new_cash_qty -= excess_cash / cash_price note = f"Deployed {excess_cash:,.2f} from {cash_ticker} into portfolio" # If cash is insufficient, sell some positions to raise cash elif desired_cash > current_cash: cash_to_raise = desired_cash - current_cash curr_value = compute_portfolio_value(portfolio, prices, current_date) for ticker in portfolio: price = prices.loc[current_date, ticker] sell_ratio = (cash_to_raise / curr_value) qty_to_sell = portfolio[ticker] * sell_ratio new_portfolio[ticker] -= qty_to_sell new_cash_qty += cash_to_raise / cash_price note = f"Raised {cash_to_raise:,.2f} into {cash_ticker}" return new_portfolio, new_cash_qty, note ############################################### # 10. Simulation: Strategy ############################################### def simulate_strategy(prices, macro_data, eq_tickers, fi_tickers, alts_tickers, initial_aum, start_date, end_date, rebalance_period, rebalance_ratio, lookback_period, metric_type, internal_rebalance_ratios, cash_ticker='SHV US Equity'): all_tickers = eq_tickers + fi_tickers + alts_tickers momentum_tickers = [t for t in all_tickers if t != cash_ticker] monthly_dates = get_observation_dates(prices, start_date, end_date, rebalance_period) daily_dates = prices.index.sort_values() daily_dates = daily_dates[(daily_dates >= start_date) & (daily_dates <= end_date)] macro_data = macro_data.copy() macro_data['UX1_UX2_Spread'] = macro_data['UX1'] - macro_data['UX2'] macro_data['UX1_UX2_SpreadEMA_12'] = macro_data["UX1_UX2_Spread"].ewm(span=12, adjust=False).mean() macro_data['Slope5D'] = macro_data["UX1_UX2_SpreadEMA_12"].diff(5) macro_data['Slope10D'] = macro_data["UX1_UX2_SpreadEMA_12"].diff(10) macro_data['Slope15D'] = macro_data["UX1_UX2_SpreadEMA_12"].diff(15) macro_data['LF98TRUU_EMA_12'] = macro_data["LF98TRUU"].ewm(span=12, adjust=False).mean() macro_data['Slope5D_HY'] = macro_data["LF98TRUU_EMA_12"].diff(5) macro_data['Slope10D_HY'] = macro_data["LF98TRUU_EMA_12"].diff(10) macro_data['Slope15D_HY'] = macro_data["LF98TRUU_EMA_12"].diff(15) macro_data["Signal_Momentum"] = ( (macro_data['Slope5D'] > 0).astype(int) * 2 - 1 + (macro_data['Slope10D'] > 0).astype(int) * 2 - 1 + (macro_data['Slope15D'] > 0).astype(int) * 2 - 1 ) macro_data["Signal_Momentum_HY"] = ( (macro_data['Slope5D_HY'] > 0).astype(int) * 2 - 1 + (macro_data['Slope10D_HY'] > 0).astype(int) * 2 - 1 + (macro_data['Slope15D_HY'] > 0).astype(int) * 2 - 1 ) available_dates = prices.index[prices.index >= start_date] if len(available_dates) == 0: raise ValueError("No trading dates found after the specified start_date") start_date = available_dates[0] portfolio = initialize_portfolio(prices, start_date, momentum_tickers, initial_aum) cash_qty = 0.0 current_regime, target_alloc = 'risk-on', 1.0 previous_regime, previous_target_alloc = current_regime, target_alloc prev_total_aum = initial_aum results = [] for current_date in daily_dates: # Save snapshot of yesterday's portfolio and cash prev_portfolio = portfolio.copy() prev_cash_qty = cash_qty daily_note = "No adjustment" vix_params = {} ranks, metrics, trades = {}, {}, {} # --- Generate regime & allocation --- regime, target_alloc, vix_signal, signal_note, vix_params = generate_risk_signals( current_date, macro_data, prices, portfolio) current_regime = regime daily_note = signal_note # --- Monthly Rebalancing Logic --- if current_date in monthly_dates: sorted_tickers, ranks, metrics = rank_assets(prices, current_date, momentum_tickers, lookback_period, metric_type) temp_portfolio, trades, pre_rebalance_value = rebalance_portfolio( portfolio, prices, current_date, momentum_tickers, sorted_tickers, internal_rebalance_ratios, rebalance_ratio) temp_portfolio = adjust_overweight(temp_portfolio, prices, current_date, sorted_tickers, threshold=0.70) temp_value = compute_portfolio_value(temp_portfolio, prices, current_date) spy_temp = temp_portfolio.get('SPY US Equity', 0) * prices.loc[current_date, 'SPY US Equity'] hyg_temp = temp_portfolio.get('HYG US Equity', 0) * prices.loc[current_date, 'HYG US Equity'] combined_weight = (spy_temp + hyg_temp) / temp_value if temp_value > 0 else 0 if (current_regime == 'risk-off') and (combined_weight < 0.40): current_regime = 'risk-on' target_alloc = 1 daily_note += " | Monthly: Forced risk-on due to SPY+HYG weight < 40% after simulation." portfolio = temp_portfolio if target_alloc != previous_target_alloc: total_aum_temp = compute_portfolio_value(portfolio, prices, current_date) + cash_qty * prices.loc[current_date, cash_ticker] desired_value = target_alloc * total_aum_temp actual_value = compute_portfolio_value(portfolio, prices, current_date) if abs(actual_value - desired_value) / total_aum_temp > 0.001: portfolio, cash_qty, note_update = allocate_cash_from_portfolio( portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker) daily_note += " | Monthly: " + note_update else: daily_note += " | Monthly: Skipped cash adjustment due to minor deviation." else: daily_note += " | Monthly" # --- Daily Regime Handling: Only if NOT monthly date --- elif (previous_regime != current_regime) or (current_regime == 'risk-off' and target_alloc != previous_target_alloc): if previous_regime == 'risk-off' and current_regime == 'risk-on' and cash_qty > 0: portfolio, cash_qty, note_update = invest_cash_into_portfolio( portfolio, prices, current_date, cash_qty, cash_ticker) daily_note += " | " + note_update elif (previous_regime == 'risk-on' and current_regime == 'risk-off') or \ (current_regime == 'risk-off' and target_alloc != previous_target_alloc): portfolio, cash_qty, note_update = allocate_cash_from_portfolio( portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker) daily_note += " | " + note_update previous_regime = current_regime previous_target_alloc = target_alloc # --- Log AUM and daily stats using yesterday's snapshot (prev_portfolio, prev_cash_qty) --- cash_price = prices.loc[current_date, cash_ticker] cash_value = prev_cash_qty * cash_price momentum_value = sum( prev_portfolio[ticker] * prices.loc[current_date, ticker] for ticker in prev_portfolio ) total_aum = momentum_value + cash_value ret = (total_aum - prev_total_aum) / prev_total_aum if prev_total_aum > 0 else 0 prev_total_aum = total_aum row = { 'Total AUM': total_aum, 'Momentum AUM': momentum_value, 'Cash Qty': cash_qty, 'Cash Price': cash_price, 'Cash Value': cash_value, 'Current Regime': current_regime, 'Target Alloc': target_alloc, 'VIX Target': vix_params.get('vix_target_alloc', np.nan), 'VIX Signal': vix_signal, 'Adjustment Note': daily_note, 'Cash Adjustment': 0.0, 'Return': ret, 'Event': 'Monthly Rebalance' if current_date in monthly_dates else 'Daily Check', 'Slope_5D': vix_params.get('slope_5d', np.nan), 'Slope_10D': vix_params.get('slope_10d', np.nan), 'Slope_15D': vix_params.get('slope_15d', np.nan), 'VIX_Mom_Signal': vix_params.get('vix_mom_signal', np.nan), 'Date': current_date } for ticker in momentum_tickers: qty = prev_portfolio.get(ticker, 0) price = prices.loc[current_date, ticker] notional = qty * price weight = notional / momentum_value if momentum_value > 0 else np.nan row[f'qty_{ticker}'] = qty row[f'price_{ticker}'] = price row[f'notional_{ticker}'] = notional row[f'weight_{ticker}'] = weight row[f'rank_{ticker}'] = ranks.get(ticker, np.nan) row[f'metric_{ticker}'] = metrics.get(ticker, np.nan) row[f'trade_{ticker}'] = trades.get(ticker, 0) results.append(row) result_df = pd.DataFrame(results) result_df.set_index('Date', inplace=True) return result_df ############################################### # 11. Restructure Results for Output ############################################### def restructure_results(result_df, momentum_tickers): base_columns = ['Total AUM', 'Momentum AUM', 'Cash Value', 'Cash Qty', 'Cash Price', 'Return', 'Current Regime', 'Target Alloc', 'VIX Target', 'VIX Signal', 'Adjustment Note', 'Event'] vix_columns = ['Slope_5D', 'Slope_10D', 'Slope_15D', 'VIX_Mom_Signal'] price_columns = [f'price_{ticker}' for ticker in momentum_tickers] qty_columns = [f'qty_{ticker}' for ticker in momentum_tickers] notional_columns = [f'notional_{ticker}' for ticker in momentum_tickers] weight_columns = [f'weight_{ticker}' for ticker in momentum_tickers] rank_columns = [f'rank_{ticker}' for ticker in momentum_tickers] metric_columns = [f'metric_{ticker}' for ticker in momentum_tickers] trade_columns = [f'trade_{ticker}' for ticker in momentum_tickers] new_column_order = ( base_columns + price_columns + qty_columns + notional_columns + weight_columns + rank_columns + metric_columns + trade_columns + vix_columns ) existing_columns = [col for col in new_column_order if col in result_df.columns] return result_df[existing_columns] ############################################### # 12. Main – Example Usage ############################################### if __name__ == '__main__': eq_tickers = ['SPY US Equity'] fi_tickers = ['TLT US Equity', 'HYG US Equity'] alts_tickers = ['GLD US Equity', 'IGSB US Equity'] initial_aum = 100e6 start_date = pd.to_datetime('2008-01-01') end_date = pd.to_datetime('2025-02-01') rebalance_period = 1 rebalance_ratio = 0.2 lookback_period = 6 metric_type = 'simple' internal_rebalance_ratios = [0.8, 0.2, 0, -0.2, -0.8] price_filepath = r"\\asiapac.nom\data\MUM\IWM\India_IWM_IPAS\Reet\Momentum Strategy\Codes\Historic Prices.xlsx" macro_filepath = r"\\asiapac.nom\data\MUM\IWM\India_IWM_IPAS\Reet\Momentum Strategy\Momentum Strategy Overlay Data.xlsx" prices = load_price_data(price_filepath) macro_data = load_macro_data(macro_filepath) result_df = simulate_strategy(prices, macro_data, eq_tickers, fi_tickers, alts_tickers, initial_aum, start_date, end_date, rebalance_period, rebalance_ratio, lookback_period, metric_type, internal_rebalance_ratios, cash_ticker='SHV US Equity') pd.set_option('display.float_format', lambda x: f'{x:,.2f}') all_tickers = eq_tickers + fi_tickers + alts_tickers momentum_tickers = [t for t in all_tickers] result_df = restructure_results(result_df, momentum_tickers) # For example, print a summary of the simulation: print(result_df[['Total AUM', 'Momentum AUM', 'Cash Value']].tail())
Editor is loading...
Leave a Comment