Untitled
unknown
plain_text
9 months ago
26 kB
14
Indexable
import pandas as pd
import numpy as np
from datetime import datetime
from dateutil.relativedelta import relativedelta
from scipy.stats import zscore
import statsmodels.api as sm
from statsmodels.regression.rolling import RollingOLS
###############################################
# 1. Data Loading (Filtering Out Weekends)
###############################################
def load_price_data(filepath):
"""
Load historical prices from an Excel file.
Assumes that the first column is dates and the remaining columns are tickers.
Removes weekend data.
"""
df = pd.read_excel(filepath, index_col=0)
df.index = pd.to_datetime(df.index)
df = df.sort_index()
# Filter out weekends (Saturday=5, Sunday=6)
df = df[df.index.dayofweek < 5]
return df
def load_macro_data(filepath):
"""
Load macro indicators from an Excel file.
Loads VIX and VIX3M from 'Eq' sheet, LF98TRUU Index from 'FI' sheet,
and other macro indicators from 'Macro' sheet.
Removes weekend data.
"""
# VIX data
vix_data = pd.read_excel(filepath, sheet_name='Eq', index_col=0, parse_dates=True, usecols=[0, 4, 5])
vix_data.columns = ['VIX', 'VIX3M']
vix_data = vix_data[vix_data.index.dayofweek < 5]
# FI data
cdx_data = pd.read_excel(filepath, sheet_name='FI', index_col=0, parse_dates=True, usecols=[0, 2], skiprows=1)
cdx_data.columns = ['LF98TRUU']
cdx_data = cdx_data[cdx_data.index.dayofweek < 5]
# Macro data (assumed to include other indicators)
macro_data = pd.read_excel(filepath, sheet_name='Macro', index_col=0, parse_dates=True, usecols=range(8), skiprows=1)
macro_data = macro_data[macro_data.index.dayofweek < 5]
combined_data = pd.concat([vix_data, cdx_data, macro_data], axis=1)
combined_data = combined_data.fillna(method='ffill').fillna(method='bfill')
combined_data = combined_data.sort_index()
return combined_data
###############################################
# 2. Helper: Observation Dates (Monthly)
###############################################
def get_observation_dates(prices, start_date, end_date, rebalance_period):
dates = []
current_date = start_date
while current_date < end_date:
candidate_date = (current_date + relativedelta(months=rebalance_period)).replace(day=1)
while candidate_date not in prices.index:
candidate_date += pd.Timedelta(days=1)
if candidate_date.month != (current_date + relativedelta(months=rebalance_period)).month:
candidate_date = None
break
if candidate_date is None or candidate_date > end_date:
break
dates.append(candidate_date)
current_date = candidate_date
return dates
###############################################
# 3. Portfolio Initialization
###############################################
def initialize_portfolio(prices, date, tickers, initial_aum):
portfolio = {}
mask = prices.index < date
prev_date = prices.index[mask][-1]
allocation = initial_aum / len(tickers)
for ticker in tickers:
price = prices.loc[prev_date, ticker]
portfolio[ticker] = allocation / price
return portfolio
###############################################
# 4. Lookback Metric Computation
###############################################
def compute_lookback_metric(prices, current_date, ticker, lookback_period, metric_type='simple'):
prices = prices.sort_index()
mask = prices.index < current_date
prev_date = prices.index[mask][-1]
lookback_date = prev_date - relativedelta(months=lookback_period)
current_price = prices[ticker].asof(prev_date)
lookback_price = prices[ticker].asof(lookback_date)
if pd.isna(current_price) or pd.isna(lookback_price):
raise ValueError(f"Missing price data for {ticker} on {prev_date} or {lookback_date}.")
if metric_type == 'simple':
metric = (current_price / lookback_price) - 1
elif metric_type == 'sma':
window = prices[ticker].loc[lookback_date:current_date]
if window.empty:
raise ValueError(f"No price data for {ticker} between {lookback_date} and {current_date}.")
sma = window.mean()
metric = (current_price - sma) / sma
else:
raise ValueError("Invalid metric type. Choose 'simple' or 'sma'.")
return metric
###############################################
# 5. Ranking Assets by Momentum
###############################################
def rank_assets(prices, current_date, tickers, lookback_period, metric_type):
metrics = {}
for ticker in tickers:
metric = compute_lookback_metric(prices, current_date, ticker, lookback_period, metric_type)
metrics[ticker] = metric
sorted_tickers = sorted(metrics, key=metrics.get, reverse=True)
ranks = {ticker: rank+1 for rank, ticker in enumerate(sorted_tickers)}
return sorted_tickers, ranks, metrics
###############################################
# 6. Compute Current Portfolio Value
###############################################
def compute_portfolio_value(portfolio, prices, current_date):
value = 0
for ticker, quantity in portfolio.items():
price = prices.loc[current_date, ticker]
value += quantity * price
return value
###############################################
# 7. Rebalance the Momentum Portfolio
###############################################
def rebalance_portfolio(portfolio, prices, current_date, tickers, sorted_tickers,
internal_rebalance_ratios, rebalance_ratio):
mask = prices.index < current_date
prev_date = prices.index[mask][-1]
prev_prices = prices.loc[prev_date]
curr_prices = prices.loc[current_date]
portfolio_value = sum(portfolio[ticker] * prev_prices[ticker] for ticker in tickers)
rebalance_amount = portfolio_value * rebalance_ratio
target_trades = {ticker: rebalance_amount * internal_rebalance_ratios[i]
for i, ticker in enumerate(sorted_tickers)}
total_sold = 0
actual_trades = {}
for ticker, target_trade in target_trades.items():
if target_trade < 0:
available_notional = portfolio[ticker] * curr_prices[ticker]
sell_target = abs(target_trade)
actual_sell = min(available_notional, sell_target)
actual_trades[ticker] = -actual_sell
total_sold += actual_sell
else:
actual_trades[ticker] = 0
total_buy_target = sum(t for t in target_trades.values() if t > 0)
if total_buy_target > 0:
for ticker, target_trade in target_trades.items():
if target_trade > 0:
proportion = target_trade / total_buy_target
buy_amount = total_sold * proportion
actual_trades[ticker] = buy_amount
new_portfolio = portfolio.copy()
for ticker, trade_notional in actual_trades.items():
execution_price = curr_prices[ticker]
qty_change = trade_notional / execution_price
new_portfolio[ticker] += qty_change
return new_portfolio, actual_trades, portfolio_value
def adjust_overweight(portfolio, prices, current_date, sorted_tickers, threshold=0.70):
mask = prices.index < current_date
prev_date = prices.index[mask][-1]
prev_prices = prices.loc[prev_date]
curr_prices = prices.loc[current_date]
portfolio_value = sum(portfolio[ticker] * prev_prices[ticker] for ticker in portfolio)
weights = {ticker: (portfolio[ticker] * prev_prices[ticker]) / portfolio_value
for ticker in portfolio}
new_portfolio = portfolio.copy()
for overweight in portfolio:
if weights[overweight] > threshold:
extra_weight = weights[overweight] - threshold
extra_value = extra_weight * portfolio_value
execution_price_over = curr_prices[overweight]
qty_reduce = extra_value / execution_price_over
new_portfolio[overweight] -= qty_reduce
remaining_value = extra_value
for candidate in sorted_tickers:
if candidate == overweight:
continue
candidate_value = new_portfolio[candidate] * curr_prices[candidate]
candidate_weight = candidate_value / portfolio_value
if candidate_weight < threshold:
capacity = (threshold - candidate_weight) * portfolio_value
allocation = min(remaining_value, capacity)
qty_add = allocation / curr_prices[candidate]
new_portfolio[candidate] += qty_add
remaining_value -= allocation
if remaining_value <= 0:
break
return new_portfolio
###############################################
# 8. New Helper: Rolling Regression-Based SPY Return Signal
###############################################
def compute_regression_signal_rolling(current_date, macro_data, prices, window_size=100, slope_window=5, threshold=0.0):
"""
Computes the next-day SPY return signal using a rolling regression.
Steps:
1. Compute the TS as (VIX - VIX3M) and its rolling slope over `slope_window` days.
2. Merge the slope with next-day SPY returns.
3. Use RollingOLS over the most recent `window_size` observations to estimate:
SPY_return_next = alpha + beta_pos * max(slope,0) + beta_neg * min(slope,0) + error
4. Compute the current day's slope (using the same method) and then predict the next-day return.
5. Return the predicted return and a signal: 'risk-off' if below threshold, else 'risk-on'.
"""
# Create a copy and compute TS
temp = macro_data.copy()
temp['TS'] = temp['VIX'] - temp['VIX3M']
# Rolling slope calculation
temp['slope'] = temp['TS'].rolling(window=slope_window).apply(
lambda x: np.polyfit(range(len(x)), x, 1)[0] if len(x) == slope_window else np.nan,
raw=True
)
# Compute next-day SPY returns from prices
spy_prices = prices['SPY US Equity']
spy_returns = spy_prices.pct_change().shift(-1) # next-day returns
# Merge slope and next-day returns
df = temp[['slope']].copy()
df['SPY_return_next'] = spy_returns
df = df.dropna(subset=['slope', 'SPY_return_next'])
# Use only data up to current_date
df = df.loc[:current_date]
if len(df) < window_size:
return None, None # Not enough data
# Take the most recent window_size observations
df_window = df.iloc[-window_size:].copy()
# Split slope into positive and negative parts
df_window['slope_pos'] = df_window['slope'].apply(lambda x: x if x > 0 else 0)
df_window['slope_neg'] = df_window['slope'].apply(lambda x: x if x < 0 else 0)
X = df_window[['slope_pos', 'slope_neg']]
X = sm.add_constant(X)
y = df_window['SPY_return_next']
# RollingOLS with window=window_size (here the whole window is used)
model = RollingOLS(endog=y, exog=X, window=window_size)
rres = model.fit()
params = rres.params.iloc[-1] # most recent estimates
# Compute current day's slope
current_date_eff = current_date if current_date in temp.index else temp.index[-1]
idx = list(temp.index).index(current_date_eff)
if idx < slope_window - 1:
return None, None # Not enough data for current slope
window_dates = temp.index[idx - slope_window + 1: idx + 1]
x_current = np.arange(slope_window)
y_current = temp.loc[window_dates, 'TS'].values
current_slope = np.polyfit(x_current, y_current, 1)[0]
current_slope_pos = current_slope if current_slope > 0 else 0
current_slope_neg = current_slope if current_slope < 0 else 0
X_current = np.array([1, current_slope_pos, current_slope_neg])
predicted_return = np.dot(params, X_current)
signal = 'risk-off' if predicted_return < threshold else 'risk-on'
return predicted_return, signal
###############################################
# 9. Helper Functions for Cash Management
###############################################
def invest_cash_into_portfolio(portfolio, prices, current_date, cash_qty, cash_ticker):
"""
When switching from risk-off to risk-on, sell the cash instrument (e.g. SHV)
and reinvest its proceeds into the portfolio using previous-day weights.
"""
cash_price = prices.loc[current_date, cash_ticker]
cash_available = cash_qty * cash_price
if cash_available <= 0:
return portfolio, cash_qty, f"No {cash_ticker} to reinvest."
mask = prices.index < current_date
prev_date = prices.index[mask][-1]
mom_value = compute_portfolio_value(portfolio, prices, prev_date)
new_portfolio = portfolio.copy()
for ticker in portfolio:
prev_price = prices.loc[prev_date, ticker]
weight = (portfolio[ticker] * prev_price) / mom_value if mom_value > 0 else 1/len(portfolio)
invest_amount = weight * cash_available
price_today = prices.loc[current_date, ticker]
qty_to_buy = invest_amount / price_today
new_portfolio[ticker] += qty_to_buy
return new_portfolio, 0.0, f"Reinvested {cash_available:,.2f} from {cash_ticker}"
def allocate_cash_from_portfolio(portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker):
"""
When switching from risk-on to risk-off (or when target_alloc changes),
sell a proportional amount of portfolio securities to build a cash position
in the cash_ticker.
"""
mask = prices.index < current_date
prev_date = prices.index[mask][-1]
curr_value = compute_portfolio_value(portfolio, prices, prev_date)
cash_price = prices.loc[current_date, cash_ticker]
cash_equiv = cash_qty * cash_price
total_aum = curr_value + cash_equiv
desired_value = target_alloc * total_aum
new_portfolio = portfolio.copy()
new_cash_qty = cash_qty
note = ""
if curr_value > desired_value:
excess = curr_value - desired_value
for ticker in portfolio:
price = prices.loc[current_date, ticker]
ticker_value = portfolio[ticker] * price
sell_amount = (ticker_value / curr_value) * excess
qty_to_sell = sell_amount / price
new_portfolio[ticker] -= qty_to_sell
new_cash_qty += excess / cash_price
note = f"Raised {excess:,.2f} into {cash_ticker}"
elif curr_value < desired_value and cash_qty > 0:
shortage = desired_value - curr_value
available_cash = min(shortage, cash_equiv)
for ticker in portfolio:
price = prices.loc[current_date, ticker]
ticker_value = portfolio[ticker] * price
weight = (ticker_value / curr_value) if curr_value > 0 else 1/len(portfolio)
invest_amount = weight * available_cash
qty_to_buy = invest_amount / price
new_portfolio[ticker] += qty_to_buy
new_cash_qty -= available_cash / cash_price
note = f"Deployed {available_cash:,.2f} from {cash_ticker} into portfolio"
return new_portfolio, new_cash_qty, note
###############################################
# 10. Simulation: Strategy with Rolling Regression Signal
###############################################
def simulate_strategy(prices, macro_data, eq_tickers, fi_tickers, alts_tickers,
initial_aum, start_date, end_date,
rebalance_period, rebalance_ratio,
lookback_period, metric_type,
internal_rebalance_ratios,
cash_ticker='SHV US Equity',
macro_max_alloc=1.0, macro_min_alloc=0.6,
spy_return_threshold=0.0):
"""
Simulation using a rolling regression-based signal.
- Computes the regression signal from the rolling regression on VIX term structure slopes.
- If predicted SPY return is below spy_return_threshold, signal is 'risk-off' (target allocation = macro_min_alloc).
- Otherwise, signal is 'risk-on' (target allocation = 1.0).
- The simulation includes monthly rebalancing and cash management.
"""
# Define tickers for momentum (exclude cash_ticker)
all_tickers = eq_tickers + fi_tickers + alts_tickers
momentum_tickers = [t for t in all_tickers if t != cash_ticker]
monthly_dates = get_observation_dates(prices, start_date, end_date, rebalance_period)
daily_dates = prices.index.sort_values()
daily_dates = daily_dates[(daily_dates >= start_date) & (daily_dates <= end_date)]
macro_data = macro_data.copy()
# Initialize portfolio (momentum securities only) and cash (in cash_ticker)
portfolio = initialize_portfolio(prices, start_date, momentum_tickers, initial_aum)
cash_qty = 0.0
current_regime = 'risk-on'
target_alloc = 1.0
previous_regime = current_regime
previous_target_alloc = target_alloc
prev_total_aum = initial_aum
results = []
for current_date in daily_dates:
daily_note = "No adjustment"
# --- Obtain rolling regression signal ---
predicted_spy_return, regression_signal = compute_regression_signal_rolling(
current_date, macro_data, prices, window_size=100, slope_window=5, threshold=spy_return_threshold
)
# --- For logging: also capture VIX values and spread ---
vix_1m = macro_data['VIX'].asof(current_date)
vix_3m = macro_data['VIX3M'].asof(current_date)
vix_spread = vix_1m - vix_3m if (pd.notna(vix_1m) and pd.notna(vix_3m)) else np.nan
# --- Determine portfolio value and SPY/HYG weights ---
mask = prices.index < current_date
prev_date = prices.index[mask][-1]
mom_value = compute_portfolio_value(portfolio, prices, prev_date)
spy_weight = (portfolio.get('SPY US Equity', 0) * prices.loc[prev_date, 'SPY US Equity']) / mom_value if mom_value > 0 else 0
hyg_weight = (portfolio.get('HYG US Equity', 0) * prices.loc[prev_date, 'HYG US Equity']) / mom_value if mom_value > 0 else 0
# --- Determine regime based solely on regression signal ---
if regression_signal == 'risk-off':
# Safeguard: if combined SPY+HYG weight is too low, force risk-on.
if (spy_weight + hyg_weight) < 0.40:
current_regime = 'risk-on'
target_alloc = 1.0
daily_note = "Forced regime to risk-on (SPY+HYG weight < 40)"
else:
current_regime = 'risk-off'
target_alloc = macro_min_alloc # e.g., 0.6 for risk-off
else:
current_regime = 'risk-on'
target_alloc = 1.0
# --- Cash rebalancing logic ---
if (previous_regime != current_regime) or (current_regime == 'risk-off' and target_alloc != previous_target_alloc):
if previous_regime == 'risk-off' and current_regime == 'risk-on' and cash_qty > 0:
portfolio, cash_qty, note_update = invest_cash_into_portfolio(portfolio, prices, current_date, cash_qty, cash_ticker)
daily_note += " | " + note_update
elif (previous_regime == 'risk-on' and current_regime == 'risk-off') or (current_regime == 'risk-off' and target_alloc != previous_target_alloc):
portfolio, cash_qty, note_update = allocate_cash_from_portfolio(portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker)
daily_note += " | " + note_update
previous_regime = current_regime
previous_target_alloc = target_alloc
# --- Monthly Rebalancing ---
if current_date in monthly_dates:
sorted_tickers, ranks, metrics = rank_assets(prices, current_date, momentum_tickers, lookback_period, metric_type)
temp_portfolio, trades, _ = rebalance_portfolio(portfolio, prices, current_date, momentum_tickers, sorted_tickers, internal_rebalance_ratios, rebalance_ratio)
temp_value = compute_portfolio_value(temp_portfolio, prices, current_date)
spy_temp = temp_portfolio.get('SPY US Equity', 0) * prices.loc[current_date, 'SPY US Equity']
hyg_temp = temp_portfolio.get('HYG US Equity', 0) * prices.loc[current_date, 'HYG US Equity']
combined_weight = (spy_temp + hyg_temp) / temp_value if temp_value > 0 else 0
if (current_regime == 'risk-off') and (combined_weight < 0.40):
current_regime = 'risk-on'
target_alloc = 1.0
daily_note += " | Monthly: Forced risk-on (SPY+HYG weight < 40)"
total_aum = compute_portfolio_value(portfolio, prices, current_date) + cash_qty * prices.loc[current_date, cash_ticker]
simulated_value = temp_value
new_portfolio = {}
for ticker in temp_portfolio:
price = prices.loc[current_date, ticker]
simulated_weight = (temp_portfolio[ticker] * price) / simulated_value if simulated_value > 0 else 1/len(temp_portfolio)
new_qty = (total_aum * simulated_weight) / price
new_portfolio[ticker] = new_qty
portfolio = new_portfolio
cash_qty = 0
else:
portfolio = temp_portfolio
curr_value = compute_portfolio_value(portfolio, prices, current_date)
total_aum = curr_value + cash_qty * prices.loc[current_date, cash_ticker]
desired_value = target_alloc * total_aum
if curr_value > desired_value:
portfolio, cash_qty, note_update = allocate_cash_from_portfolio(portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker)
daily_note += " | Monthly: " + note_update
elif curr_value < desired_value and cash_qty > 0:
portfolio, cash_qty, note_update = allocate_cash_from_portfolio(portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker)
daily_note += " | Monthly: " + note_update
portfolio = adjust_overweight(portfolio, prices, current_date, sorted_tickers, threshold=0.70)
# --- Update daily AUM calculation ---
current_mom_value = compute_portfolio_value(portfolio, prices, current_date)
cash_price = prices.loc[current_date, cash_ticker]
cash_value = cash_qty * cash_price
total_aum = current_mom_value + cash_value
ret = (total_aum - prev_total_aum) / prev_total_aum if prev_total_aum > 0 else 0
prev_total_aum = total_aum
# --- Log results ---
row = {
'Date': current_date,
'Momentum AUM': current_mom_value,
'Cash Qty': cash_qty,
'Cash Price': cash_price,
'Cash Value': cash_value,
'Total AUM': total_aum,
'Current Regime': current_regime,
'Target Alloc': target_alloc,
'Regression Signal': regression_signal,
'Predicted SPY Return': predicted_spy_return,
'VIX 1M': vix_1m,
'VIX 3M': vix_3m,
'VIX Spread': vix_spread,
'Adjustment Note': daily_note,
'Return': ret,
'Event': 'Monthly Rebalance' if current_date in monthly_dates else 'Daily Check'
}
for ticker in momentum_tickers:
price = prices.loc[current_date, ticker]
qty = portfolio[ticker]
notional = qty * price
row[f'qty_{ticker}'] = qty
row[f'notional_{ticker}'] = notional
row[f'weight_{ticker}'] = (notional / current_mom_value) if current_mom_value > 0 else np.nan
results.append(row)
result_df = pd.DataFrame(results)
result_df.set_index('Date', inplace=True)
return result_df
###############################################
# 11. Main – Example Usage
###############################################
if __name__ == '__main__':
# Define asset tickers.
eq_tickers = ['SPY US Equity']
fi_tickers = ['TLT US Equity', 'HYG US Equity']
alts_tickers = ['GLD US Equity', 'IGSB US Equity'] # Do not include cash_ticker here
# Define the cash ticker (for short-term cash management)
cash_ticker = 'SHV US Equity'
initial_aum = 100e6 # 100 million
start_date = pd.to_datetime('2008-01-01')
end_date = pd.to_datetime('2025-02-01')
rebalance_period = 1 # monthly rebalancing
rebalance_ratio = 0.2 # portion of portfolio rebalanced each month
lookback_period = 6
metric_type = 'simple'
internal_rebalance_ratios = [0.8, 0.2, 0, -0.2, -0.8]
# File paths (adjust these to your environment).
price_filepath = r"\\asiapac.nom\data\MUM\IWM\India_IWM_IPAS\Reet\Momentum Strategy\Codes\Historic Prices.xlsx"
macro_filepath = r"\\asiapac.nom\data\MUM\IWM\India_IWM_IPAS\Reet\Momentum Strategy\Momentum Strategy Overlay Data.xlsx"
prices = load_price_data(price_filepath)
macro_data = load_macro_data(macro_filepath)
# Run simulation.
result_df = simulate_strategy(prices, macro_data,
eq_tickers, fi_tickers, alts_tickers,
initial_aum, start_date, end_date,
rebalance_period, rebalance_ratio,
lookback_period, metric_type,
internal_rebalance_ratios,
cash_ticker=cash_ticker,
macro_max_alloc=1.0, macro_min_alloc=0.6,
spy_return_threshold=0.0)
pd.set_option('display.float_format', lambda x: f'{x:,.2f}')
print(result_df[['Total AUM', 'Momentum AUM', 'Cash Qty', 'Cash Price', 'Cash Value']].tail())
Editor is loading...
Leave a Comment