Untitled
unknown
plain_text
a month ago
26 kB
3
Indexable
import pandas as pd import numpy as np from datetime import datetime from dateutil.relativedelta import relativedelta from scipy.stats import zscore import statsmodels.api as sm from statsmodels.regression.rolling import RollingOLS ############################################### # 1. Data Loading (Filtering Out Weekends) ############################################### def load_price_data(filepath): """ Load historical prices from an Excel file. Assumes that the first column is dates and the remaining columns are tickers. Removes weekend data. """ df = pd.read_excel(filepath, index_col=0) df.index = pd.to_datetime(df.index) df = df.sort_index() # Filter out weekends (Saturday=5, Sunday=6) df = df[df.index.dayofweek < 5] return df def load_macro_data(filepath): """ Load macro indicators from an Excel file. Loads VIX and VIX3M from 'Eq' sheet, LF98TRUU Index from 'FI' sheet, and other macro indicators from 'Macro' sheet. Removes weekend data. """ # VIX data vix_data = pd.read_excel(filepath, sheet_name='Eq', index_col=0, parse_dates=True, usecols=[0, 4, 5]) vix_data.columns = ['VIX', 'VIX3M'] vix_data = vix_data[vix_data.index.dayofweek < 5] # FI data cdx_data = pd.read_excel(filepath, sheet_name='FI', index_col=0, parse_dates=True, usecols=[0, 2], skiprows=1) cdx_data.columns = ['LF98TRUU'] cdx_data = cdx_data[cdx_data.index.dayofweek < 5] # Macro data (assumed to include other indicators) macro_data = pd.read_excel(filepath, sheet_name='Macro', index_col=0, parse_dates=True, usecols=range(8), skiprows=1) macro_data = macro_data[macro_data.index.dayofweek < 5] combined_data = pd.concat([vix_data, cdx_data, macro_data], axis=1) combined_data = combined_data.fillna(method='ffill').fillna(method='bfill') combined_data = combined_data.sort_index() return combined_data ############################################### # 2. Helper: Observation Dates (Monthly) ############################################### def get_observation_dates(prices, start_date, end_date, rebalance_period): dates = [] current_date = start_date while current_date < end_date: candidate_date = (current_date + relativedelta(months=rebalance_period)).replace(day=1) while candidate_date not in prices.index: candidate_date += pd.Timedelta(days=1) if candidate_date.month != (current_date + relativedelta(months=rebalance_period)).month: candidate_date = None break if candidate_date is None or candidate_date > end_date: break dates.append(candidate_date) current_date = candidate_date return dates ############################################### # 3. Portfolio Initialization ############################################### def initialize_portfolio(prices, date, tickers, initial_aum): portfolio = {} mask = prices.index < date prev_date = prices.index[mask][-1] allocation = initial_aum / len(tickers) for ticker in tickers: price = prices.loc[prev_date, ticker] portfolio[ticker] = allocation / price return portfolio ############################################### # 4. Lookback Metric Computation ############################################### def compute_lookback_metric(prices, current_date, ticker, lookback_period, metric_type='simple'): prices = prices.sort_index() mask = prices.index < current_date prev_date = prices.index[mask][-1] lookback_date = prev_date - relativedelta(months=lookback_period) current_price = prices[ticker].asof(prev_date) lookback_price = prices[ticker].asof(lookback_date) if pd.isna(current_price) or pd.isna(lookback_price): raise ValueError(f"Missing price data for {ticker} on {prev_date} or {lookback_date}.") if metric_type == 'simple': metric = (current_price / lookback_price) - 1 elif metric_type == 'sma': window = prices[ticker].loc[lookback_date:current_date] if window.empty: raise ValueError(f"No price data for {ticker} between {lookback_date} and {current_date}.") sma = window.mean() metric = (current_price - sma) / sma else: raise ValueError("Invalid metric type. Choose 'simple' or 'sma'.") return metric ############################################### # 5. Ranking Assets by Momentum ############################################### def rank_assets(prices, current_date, tickers, lookback_period, metric_type): metrics = {} for ticker in tickers: metric = compute_lookback_metric(prices, current_date, ticker, lookback_period, metric_type) metrics[ticker] = metric sorted_tickers = sorted(metrics, key=metrics.get, reverse=True) ranks = {ticker: rank+1 for rank, ticker in enumerate(sorted_tickers)} return sorted_tickers, ranks, metrics ############################################### # 6. Compute Current Portfolio Value ############################################### def compute_portfolio_value(portfolio, prices, current_date): value = 0 for ticker, quantity in portfolio.items(): price = prices.loc[current_date, ticker] value += quantity * price return value ############################################### # 7. Rebalance the Momentum Portfolio ############################################### def rebalance_portfolio(portfolio, prices, current_date, tickers, sorted_tickers, internal_rebalance_ratios, rebalance_ratio): mask = prices.index < current_date prev_date = prices.index[mask][-1] prev_prices = prices.loc[prev_date] curr_prices = prices.loc[current_date] portfolio_value = sum(portfolio[ticker] * prev_prices[ticker] for ticker in tickers) rebalance_amount = portfolio_value * rebalance_ratio target_trades = {ticker: rebalance_amount * internal_rebalance_ratios[i] for i, ticker in enumerate(sorted_tickers)} total_sold = 0 actual_trades = {} for ticker, target_trade in target_trades.items(): if target_trade < 0: available_notional = portfolio[ticker] * curr_prices[ticker] sell_target = abs(target_trade) actual_sell = min(available_notional, sell_target) actual_trades[ticker] = -actual_sell total_sold += actual_sell else: actual_trades[ticker] = 0 total_buy_target = sum(t for t in target_trades.values() if t > 0) if total_buy_target > 0: for ticker, target_trade in target_trades.items(): if target_trade > 0: proportion = target_trade / total_buy_target buy_amount = total_sold * proportion actual_trades[ticker] = buy_amount new_portfolio = portfolio.copy() for ticker, trade_notional in actual_trades.items(): execution_price = curr_prices[ticker] qty_change = trade_notional / execution_price new_portfolio[ticker] += qty_change return new_portfolio, actual_trades, portfolio_value def adjust_overweight(portfolio, prices, current_date, sorted_tickers, threshold=0.70): mask = prices.index < current_date prev_date = prices.index[mask][-1] prev_prices = prices.loc[prev_date] curr_prices = prices.loc[current_date] portfolio_value = sum(portfolio[ticker] * prev_prices[ticker] for ticker in portfolio) weights = {ticker: (portfolio[ticker] * prev_prices[ticker]) / portfolio_value for ticker in portfolio} new_portfolio = portfolio.copy() for overweight in portfolio: if weights[overweight] > threshold: extra_weight = weights[overweight] - threshold extra_value = extra_weight * portfolio_value execution_price_over = curr_prices[overweight] qty_reduce = extra_value / execution_price_over new_portfolio[overweight] -= qty_reduce remaining_value = extra_value for candidate in sorted_tickers: if candidate == overweight: continue candidate_value = new_portfolio[candidate] * curr_prices[candidate] candidate_weight = candidate_value / portfolio_value if candidate_weight < threshold: capacity = (threshold - candidate_weight) * portfolio_value allocation = min(remaining_value, capacity) qty_add = allocation / curr_prices[candidate] new_portfolio[candidate] += qty_add remaining_value -= allocation if remaining_value <= 0: break return new_portfolio ############################################### # 8. New Helper: Rolling Regression-Based SPY Return Signal ############################################### def compute_regression_signal_rolling(current_date, macro_data, prices, window_size=100, slope_window=5, threshold=0.0): """ Computes the next-day SPY return signal using a rolling regression. Steps: 1. Compute the TS as (VIX - VIX3M) and its rolling slope over `slope_window` days. 2. Merge the slope with next-day SPY returns. 3. Use RollingOLS over the most recent `window_size` observations to estimate: SPY_return_next = alpha + beta_pos * max(slope,0) + beta_neg * min(slope,0) + error 4. Compute the current day's slope (using the same method) and then predict the next-day return. 5. Return the predicted return and a signal: 'risk-off' if below threshold, else 'risk-on'. """ # Create a copy and compute TS temp = macro_data.copy() temp['TS'] = temp['VIX'] - temp['VIX3M'] # Rolling slope calculation temp['slope'] = temp['TS'].rolling(window=slope_window).apply( lambda x: np.polyfit(range(len(x)), x, 1)[0] if len(x) == slope_window else np.nan, raw=True ) # Compute next-day SPY returns from prices spy_prices = prices['SPY US Equity'] spy_returns = spy_prices.pct_change().shift(-1) # next-day returns # Merge slope and next-day returns df = temp[['slope']].copy() df['SPY_return_next'] = spy_returns df = df.dropna(subset=['slope', 'SPY_return_next']) # Use only data up to current_date df = df.loc[:current_date] if len(df) < window_size: return None, None # Not enough data # Take the most recent window_size observations df_window = df.iloc[-window_size:].copy() # Split slope into positive and negative parts df_window['slope_pos'] = df_window['slope'].apply(lambda x: x if x > 0 else 0) df_window['slope_neg'] = df_window['slope'].apply(lambda x: x if x < 0 else 0) X = df_window[['slope_pos', 'slope_neg']] X = sm.add_constant(X) y = df_window['SPY_return_next'] # RollingOLS with window=window_size (here the whole window is used) model = RollingOLS(endog=y, exog=X, window=window_size) rres = model.fit() params = rres.params.iloc[-1] # most recent estimates # Compute current day's slope current_date_eff = current_date if current_date in temp.index else temp.index[-1] idx = list(temp.index).index(current_date_eff) if idx < slope_window - 1: return None, None # Not enough data for current slope window_dates = temp.index[idx - slope_window + 1: idx + 1] x_current = np.arange(slope_window) y_current = temp.loc[window_dates, 'TS'].values current_slope = np.polyfit(x_current, y_current, 1)[0] current_slope_pos = current_slope if current_slope > 0 else 0 current_slope_neg = current_slope if current_slope < 0 else 0 X_current = np.array([1, current_slope_pos, current_slope_neg]) predicted_return = np.dot(params, X_current) signal = 'risk-off' if predicted_return < threshold else 'risk-on' return predicted_return, signal ############################################### # 9. Helper Functions for Cash Management ############################################### def invest_cash_into_portfolio(portfolio, prices, current_date, cash_qty, cash_ticker): """ When switching from risk-off to risk-on, sell the cash instrument (e.g. SHV) and reinvest its proceeds into the portfolio using previous-day weights. """ cash_price = prices.loc[current_date, cash_ticker] cash_available = cash_qty * cash_price if cash_available <= 0: return portfolio, cash_qty, f"No {cash_ticker} to reinvest." mask = prices.index < current_date prev_date = prices.index[mask][-1] mom_value = compute_portfolio_value(portfolio, prices, prev_date) new_portfolio = portfolio.copy() for ticker in portfolio: prev_price = prices.loc[prev_date, ticker] weight = (portfolio[ticker] * prev_price) / mom_value if mom_value > 0 else 1/len(portfolio) invest_amount = weight * cash_available price_today = prices.loc[current_date, ticker] qty_to_buy = invest_amount / price_today new_portfolio[ticker] += qty_to_buy return new_portfolio, 0.0, f"Reinvested {cash_available:,.2f} from {cash_ticker}" def allocate_cash_from_portfolio(portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker): """ When switching from risk-on to risk-off (or when target_alloc changes), sell a proportional amount of portfolio securities to build a cash position in the cash_ticker. """ mask = prices.index < current_date prev_date = prices.index[mask][-1] curr_value = compute_portfolio_value(portfolio, prices, prev_date) cash_price = prices.loc[current_date, cash_ticker] cash_equiv = cash_qty * cash_price total_aum = curr_value + cash_equiv desired_value = target_alloc * total_aum new_portfolio = portfolio.copy() new_cash_qty = cash_qty note = "" if curr_value > desired_value: excess = curr_value - desired_value for ticker in portfolio: price = prices.loc[current_date, ticker] ticker_value = portfolio[ticker] * price sell_amount = (ticker_value / curr_value) * excess qty_to_sell = sell_amount / price new_portfolio[ticker] -= qty_to_sell new_cash_qty += excess / cash_price note = f"Raised {excess:,.2f} into {cash_ticker}" elif curr_value < desired_value and cash_qty > 0: shortage = desired_value - curr_value available_cash = min(shortage, cash_equiv) for ticker in portfolio: price = prices.loc[current_date, ticker] ticker_value = portfolio[ticker] * price weight = (ticker_value / curr_value) if curr_value > 0 else 1/len(portfolio) invest_amount = weight * available_cash qty_to_buy = invest_amount / price new_portfolio[ticker] += qty_to_buy new_cash_qty -= available_cash / cash_price note = f"Deployed {available_cash:,.2f} from {cash_ticker} into portfolio" return new_portfolio, new_cash_qty, note ############################################### # 10. Simulation: Strategy with Rolling Regression Signal ############################################### def simulate_strategy(prices, macro_data, eq_tickers, fi_tickers, alts_tickers, initial_aum, start_date, end_date, rebalance_period, rebalance_ratio, lookback_period, metric_type, internal_rebalance_ratios, cash_ticker='SHV US Equity', macro_max_alloc=1.0, macro_min_alloc=0.6, spy_return_threshold=0.0): """ Simulation using a rolling regression-based signal. - Computes the regression signal from the rolling regression on VIX term structure slopes. - If predicted SPY return is below spy_return_threshold, signal is 'risk-off' (target allocation = macro_min_alloc). - Otherwise, signal is 'risk-on' (target allocation = 1.0). - The simulation includes monthly rebalancing and cash management. """ # Define tickers for momentum (exclude cash_ticker) all_tickers = eq_tickers + fi_tickers + alts_tickers momentum_tickers = [t for t in all_tickers if t != cash_ticker] monthly_dates = get_observation_dates(prices, start_date, end_date, rebalance_period) daily_dates = prices.index.sort_values() daily_dates = daily_dates[(daily_dates >= start_date) & (daily_dates <= end_date)] macro_data = macro_data.copy() # Initialize portfolio (momentum securities only) and cash (in cash_ticker) portfolio = initialize_portfolio(prices, start_date, momentum_tickers, initial_aum) cash_qty = 0.0 current_regime = 'risk-on' target_alloc = 1.0 previous_regime = current_regime previous_target_alloc = target_alloc prev_total_aum = initial_aum results = [] for current_date in daily_dates: daily_note = "No adjustment" # --- Obtain rolling regression signal --- predicted_spy_return, regression_signal = compute_regression_signal_rolling( current_date, macro_data, prices, window_size=100, slope_window=5, threshold=spy_return_threshold ) # --- For logging: also capture VIX values and spread --- vix_1m = macro_data['VIX'].asof(current_date) vix_3m = macro_data['VIX3M'].asof(current_date) vix_spread = vix_1m - vix_3m if (pd.notna(vix_1m) and pd.notna(vix_3m)) else np.nan # --- Determine portfolio value and SPY/HYG weights --- mask = prices.index < current_date prev_date = prices.index[mask][-1] mom_value = compute_portfolio_value(portfolio, prices, prev_date) spy_weight = (portfolio.get('SPY US Equity', 0) * prices.loc[prev_date, 'SPY US Equity']) / mom_value if mom_value > 0 else 0 hyg_weight = (portfolio.get('HYG US Equity', 0) * prices.loc[prev_date, 'HYG US Equity']) / mom_value if mom_value > 0 else 0 # --- Determine regime based solely on regression signal --- if regression_signal == 'risk-off': # Safeguard: if combined SPY+HYG weight is too low, force risk-on. if (spy_weight + hyg_weight) < 0.40: current_regime = 'risk-on' target_alloc = 1.0 daily_note = "Forced regime to risk-on (SPY+HYG weight < 40)" else: current_regime = 'risk-off' target_alloc = macro_min_alloc # e.g., 0.6 for risk-off else: current_regime = 'risk-on' target_alloc = 1.0 # --- Cash rebalancing logic --- if (previous_regime != current_regime) or (current_regime == 'risk-off' and target_alloc != previous_target_alloc): if previous_regime == 'risk-off' and current_regime == 'risk-on' and cash_qty > 0: portfolio, cash_qty, note_update = invest_cash_into_portfolio(portfolio, prices, current_date, cash_qty, cash_ticker) daily_note += " | " + note_update elif (previous_regime == 'risk-on' and current_regime == 'risk-off') or (current_regime == 'risk-off' and target_alloc != previous_target_alloc): portfolio, cash_qty, note_update = allocate_cash_from_portfolio(portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker) daily_note += " | " + note_update previous_regime = current_regime previous_target_alloc = target_alloc # --- Monthly Rebalancing --- if current_date in monthly_dates: sorted_tickers, ranks, metrics = rank_assets(prices, current_date, momentum_tickers, lookback_period, metric_type) temp_portfolio, trades, _ = rebalance_portfolio(portfolio, prices, current_date, momentum_tickers, sorted_tickers, internal_rebalance_ratios, rebalance_ratio) temp_value = compute_portfolio_value(temp_portfolio, prices, current_date) spy_temp = temp_portfolio.get('SPY US Equity', 0) * prices.loc[current_date, 'SPY US Equity'] hyg_temp = temp_portfolio.get('HYG US Equity', 0) * prices.loc[current_date, 'HYG US Equity'] combined_weight = (spy_temp + hyg_temp) / temp_value if temp_value > 0 else 0 if (current_regime == 'risk-off') and (combined_weight < 0.40): current_regime = 'risk-on' target_alloc = 1.0 daily_note += " | Monthly: Forced risk-on (SPY+HYG weight < 40)" total_aum = compute_portfolio_value(portfolio, prices, current_date) + cash_qty * prices.loc[current_date, cash_ticker] simulated_value = temp_value new_portfolio = {} for ticker in temp_portfolio: price = prices.loc[current_date, ticker] simulated_weight = (temp_portfolio[ticker] * price) / simulated_value if simulated_value > 0 else 1/len(temp_portfolio) new_qty = (total_aum * simulated_weight) / price new_portfolio[ticker] = new_qty portfolio = new_portfolio cash_qty = 0 else: portfolio = temp_portfolio curr_value = compute_portfolio_value(portfolio, prices, current_date) total_aum = curr_value + cash_qty * prices.loc[current_date, cash_ticker] desired_value = target_alloc * total_aum if curr_value > desired_value: portfolio, cash_qty, note_update = allocate_cash_from_portfolio(portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker) daily_note += " | Monthly: " + note_update elif curr_value < desired_value and cash_qty > 0: portfolio, cash_qty, note_update = allocate_cash_from_portfolio(portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker) daily_note += " | Monthly: " + note_update portfolio = adjust_overweight(portfolio, prices, current_date, sorted_tickers, threshold=0.70) # --- Update daily AUM calculation --- current_mom_value = compute_portfolio_value(portfolio, prices, current_date) cash_price = prices.loc[current_date, cash_ticker] cash_value = cash_qty * cash_price total_aum = current_mom_value + cash_value ret = (total_aum - prev_total_aum) / prev_total_aum if prev_total_aum > 0 else 0 prev_total_aum = total_aum # --- Log results --- row = { 'Date': current_date, 'Momentum AUM': current_mom_value, 'Cash Qty': cash_qty, 'Cash Price': cash_price, 'Cash Value': cash_value, 'Total AUM': total_aum, 'Current Regime': current_regime, 'Target Alloc': target_alloc, 'Regression Signal': regression_signal, 'Predicted SPY Return': predicted_spy_return, 'VIX 1M': vix_1m, 'VIX 3M': vix_3m, 'VIX Spread': vix_spread, 'Adjustment Note': daily_note, 'Return': ret, 'Event': 'Monthly Rebalance' if current_date in monthly_dates else 'Daily Check' } for ticker in momentum_tickers: price = prices.loc[current_date, ticker] qty = portfolio[ticker] notional = qty * price row[f'qty_{ticker}'] = qty row[f'notional_{ticker}'] = notional row[f'weight_{ticker}'] = (notional / current_mom_value) if current_mom_value > 0 else np.nan results.append(row) result_df = pd.DataFrame(results) result_df.set_index('Date', inplace=True) return result_df ############################################### # 11. Main – Example Usage ############################################### if __name__ == '__main__': # Define asset tickers. eq_tickers = ['SPY US Equity'] fi_tickers = ['TLT US Equity', 'HYG US Equity'] alts_tickers = ['GLD US Equity', 'IGSB US Equity'] # Do not include cash_ticker here # Define the cash ticker (for short-term cash management) cash_ticker = 'SHV US Equity' initial_aum = 100e6 # 100 million start_date = pd.to_datetime('2008-01-01') end_date = pd.to_datetime('2025-02-01') rebalance_period = 1 # monthly rebalancing rebalance_ratio = 0.2 # portion of portfolio rebalanced each month lookback_period = 6 metric_type = 'simple' internal_rebalance_ratios = [0.8, 0.2, 0, -0.2, -0.8] # File paths (adjust these to your environment). price_filepath = r"\\asiapac.nom\data\MUM\IWM\India_IWM_IPAS\Reet\Momentum Strategy\Codes\Historic Prices.xlsx" macro_filepath = r"\\asiapac.nom\data\MUM\IWM\India_IWM_IPAS\Reet\Momentum Strategy\Momentum Strategy Overlay Data.xlsx" prices = load_price_data(price_filepath) macro_data = load_macro_data(macro_filepath) # Run simulation. result_df = simulate_strategy(prices, macro_data, eq_tickers, fi_tickers, alts_tickers, initial_aum, start_date, end_date, rebalance_period, rebalance_ratio, lookback_period, metric_type, internal_rebalance_ratios, cash_ticker=cash_ticker, macro_max_alloc=1.0, macro_min_alloc=0.6, spy_return_threshold=0.0) pd.set_option('display.float_format', lambda x: f'{x:,.2f}') print(result_df[['Total AUM', 'Momentum AUM', 'Cash Qty', 'Cash Price', 'Cash Value']].tail())
Editor is loading...
Leave a Comment