Untitled
unknown
plain_text
9 months ago
27 kB
12
Indexable
import pandas as pd
import numpy as np
from datetime import datetime
from dateutil.relativedelta import relativedelta
from scipy.stats import zscore
###############################################
# 1. Data Loading (Filtering Out Weekends)
###############################################
def load_price_data(filepath):
"""
Load historical prices from an Excel file.
Assumes that the first column is dates and the remaining columns are tickers.
Removes weekend data.
"""
df = pd.read_excel(filepath, index_col=0)
df.index = pd.to_datetime(df.index)
df = df.sort_index() # ensure sorted index for asof()
# Filter out weekends (Saturday=5, Sunday=6)
df = df[df.index.dayofweek < 5]
return df
def load_macro_data(filepath):
"""
Load macro indicators from an Excel file.
Loads VIX and VIX3M from 'Eq' sheet, LF98TRUU Index from 'FI' sheet,
and other macro indicators from 'Macro' sheet.
Removes weekend data.
Also computes slopes for selected macro indicators.
"""
# VIX data
vix_data = pd.read_excel(filepath, sheet_name='Eq', index_col=0, parse_dates=True, usecols=[0, 3, 4, 5])
vix_data.columns = ['VIX9D', 'VIX', 'VIX3M']
vix_data = vix_data[vix_data.index.dayofweek < 5]
# FI data
cdx_data = pd.read_excel(filepath, sheet_name='FI', index_col=0, parse_dates=True, usecols=[0, 2], skiprows=1)
cdx_data.columns = ['LF98TRUU']
cdx_data = cdx_data[cdx_data.index.dayofweek < 5]
# Macro data (assumed to include columns "CESIUSD Index", "INJCJC Index", ".HG/GC G Index", and "Consumer Confidence")
macro_data = pd.read_excel(filepath, sheet_name='Macro', index_col=0, parse_dates=True, usecols=range(8), skiprows=1)
macro_data = macro_data[macro_data.index.dayofweek < 5]
# Compute slopes for selected macro indicators.
macro_data["Surprise Index Slope"] = macro_data["CESIUSD Index"].diff()
macro_data["Jobless Claims Slope"] = macro_data["INJCJC Index"].diff()
macro_data["Copper Gold Slope"] = macro_data['.HG/GC G Index'].diff()
# Assume "Consumer Confidence" column already exists.
combined_data = pd.concat([vix_data, cdx_data, macro_data], axis=1)
combined_data = combined_data.fillna(method='ffill').fillna(method='bfill')
combined_data = combined_data.sort_index() # ensure sorted index
return combined_data
###############################################
# 2. Helper: Observation Dates (Monthly)
###############################################
def get_observation_dates(start_date, end_date, rebalance_period):
"""
Returns a list of observation dates from start_date to end_date
with a step equal to rebalance_period (in months).
"""
dates = []
current = start_date
while current <= end_date:
dates.append(current)
current += relativedelta(months=rebalance_period)
return dates
###############################################
# 3. Portfolio Initialization
###############################################
def initialize_portfolio(prices, date, tickers, initial_aum):
"""
On the start date, invest equal notional amounts in each asset.
Uses asof() to retrieve the price on or before the date.
Returns a dictionary mapping ticker -> quantity.
"""
prices = prices.sort_index()
portfolio = {}
allocation = initial_aum / len(tickers)
for ticker in tickers:
price = prices[ticker].asof(date)
if pd.isna(price):
raise ValueError(f"No price available for {ticker} as of {date}")
portfolio[ticker] = allocation / price
return portfolio
###############################################
# 4. Lookback Metric Computation
###############################################
def compute_lookback_metric(prices, current_date, ticker, lookback_period, metric_type='simple'):
"""
Computes the lookback metric for one ticker.
- For 'simple': metric = (Price_today / Price_lookback) - 1.
- For 'sma': metric = (Price_today - SMA) / SMA.
Uses asof() to get the nearest available price on or before the date.
"""
prices = prices.sort_index()
lookback_date = current_date - relativedelta(months=lookback_period)
current_price = prices[ticker].asof(current_date)
lookback_price = prices[ticker].asof(lookback_date)
if pd.isna(current_price) or pd.isna(lookback_price):
raise ValueError(f"Missing price data for {ticker} on {current_date} or {lookback_date}.")
if metric_type == 'simple':
metric = (current_price / lookback_price) - 1
elif metric_type == 'sma':
window = prices[ticker].loc[lookback_date:current_date]
if window.empty:
raise ValueError(f"No price data for {ticker} between {lookback_date} and {current_date}.")
sma = window.mean()
metric = (current_price - sma) / sma
else:
raise ValueError("Invalid metric type. Choose 'simple' or 'sma'.")
return metric
###############################################
# 5. Ranking Assets by Momentum
###############################################
def rank_assets(prices, current_date, tickers, lookback_period, metric_type):
"""
For a given observation date, compute the lookback metric for each asset,
then sort in descending order so that the highest momentum gets rank 1.
Returns:
sorted_tickers: list of tickers sorted best-first,
ranks: dictionary mapping ticker -> rank (1 is best),
metrics: dictionary mapping ticker -> computed metric value.
"""
metrics = {}
for ticker in tickers:
metric = compute_lookback_metric(prices, current_date, ticker, lookback_period, metric_type)
metrics[ticker] = metric
sorted_tickers = sorted(metrics, key=metrics.get, reverse=True)
ranks = {ticker: rank+1 for rank, ticker in enumerate(sorted_tickers)}
return sorted_tickers, ranks, metrics
###############################################
# 6. Compute Current Portfolio Value
###############################################
def compute_portfolio_value(portfolio, prices, current_date):
"""
Returns the portfolio AUM as of current_date.
"""
value = 0
for ticker, quantity in portfolio.items():
price = prices.loc[current_date, ticker]
value += quantity * price
return value
###############################################
# 7. Rebalance the Momentum Portfolio
###############################################
def rebalance_portfolio(portfolio, prices, current_date, tickers, sorted_tickers,
internal_rebalance_ratios, rebalance_ratio):
"""
Rebalances the momentum portfolio (excluding CASH adjustments).
Returns updated portfolio, trades executed, and portfolio value before cash adjustment.
"""
portfolio_value = compute_portfolio_value(portfolio, prices, current_date)
rebalance_amount = portfolio_value * rebalance_ratio
target_trades = {ticker: rebalance_amount * internal_rebalance_ratios[i]
for i, ticker in enumerate(sorted_tickers)}
total_sold = 0
actual_trades = {}
for ticker, target_trade in target_trades.items():
price = prices.loc[current_date, ticker]
if target_trade < 0:
available_notional = portfolio[ticker] * price
sell_target = abs(target_trade)
actual_sell = min(available_notional, sell_target)
actual_trades[ticker] = -actual_sell
total_sold += actual_sell
else:
actual_trades[ticker] = 0
total_buy_target = sum(trade for trade in target_trades.values() if trade > 0)
if total_buy_target > 0:
for ticker, target_trade in target_trades.items():
if target_trade > 0:
proportion = target_trade / total_buy_target
buy_amount = total_sold * proportion
actual_trades[ticker] = buy_amount
new_portfolio = portfolio.copy()
for ticker, trade_notional in actual_trades.items():
price = prices.loc[current_date, ticker]
qty_change = trade_notional / price
new_portfolio[ticker] += qty_change
return new_portfolio, actual_trades, portfolio_value
###############################################
# 8. VIX-based Allocation Function
###############################################
def momentum_allocation(vix_value, vix_mean, vix_std, max_alloc=1.0, min_alloc=0.6):
"""
Maps VIX-based signal to a target momentum allocation fraction using a simple rule.
If vix_value < vix_mean + vix_std, return max_alloc (fully risk-on);
if vix_value is between (mean+std) and (mean+2*std), return 0.8;
otherwise, return min_alloc (risk-off).
"""
if vix_value < vix_mean + vix_std:
return max_alloc
elif (vix_value >= vix_mean + vix_std) and (vix_value < vix_mean + 2*vix_std):
return 0.8
else:
return min_alloc
###############################################
# 9. Composite Macro Signal Function
###############################################
def compute_composite_series(macro_data, window):
"""
Computes the composite macro index for each date in the last 'window' days.
It uses min-max scaling on selected indicators over the same rolling window.
Assumes that macro_data contains the required columns:
"CESIUSD Index", "INJCJC Index", ".HG/GC G Index"
Returns:
A pandas Series indexed by date containing the composite index.
"""
composites = {}
sub = macro_data.loc[:macro_data.index[-1]].tail(window)
for dt in sub.index:
window_data = macro_data.loc[:dt].tail(window)
comp_vals = {}
for col in ["CESIUSD Index", "INJCJC Index", ".HG/GC G Index"]:
if col not in window_data.columns:
comp_vals[col] = 0.5
else:
min_val = window_data[col].min()
max_val = window_data[col].max()
if max_val - min_val != 0:
comp_vals[col] = (window_data[col].iloc[-1] - min_val) / (max_val - min_val)
else:
comp_vals[col] = 0.5
comp_vals["INJCJC Index"] = 1 - comp_vals["INJCJC Index"]
composite = np.mean([comp_vals["CESIUSD Index"],
comp_vals["INJCJC Index"],
comp_vals[".HG/GC G Index"]])
composites[dt] = composite
return pd.Series(composites)
def macro_signal_allocation_std(macro_data, current_date, macro_max_alloc, macro_min_alloc,
rolling_window=252):
"""
Computes the composite macro index over a rolling window and then assigns a target allocation
based on how far the current composite is above the rolling mean.
Returns:
target_alloc: the target allocation.
signal: a string label ("risk-on", "neutral", or "risk-off").
"""
comp_series = compute_composite_series(macro_data, rolling_window)
mu = comp_series.mean()
sigma = comp_series.std()
current_composite = comp_series.asof(current_date)
if pd.isna(current_composite):
return macro_max_alloc, "risk-on"
if current_composite < mu + sigma:
target_alloc = macro_max_alloc
signal = "risk-on"
elif current_composite < mu + 2*sigma:
target_alloc = (macro_max_alloc + macro_min_alloc) / 2
signal = "risk-off"
else:
target_alloc = macro_min_alloc
signal = "risk-off"
return target_alloc, signal
###############################################
# 10. Refined FI Signal Function (8 EMA and 13 EMA)
###############################################
def compute_fi_target_allocation(macro_data, current_date, fi_max_alloc, fi_min_alloc, slope_threshold=0.01, tol=0.001):
"""
Computes the FI target allocation based on a refined logic using only the 8-day and 13-day EMAs.
Logic:
1. Retrieve FI_EMA_8 and FI_EMA_13 from macro_data.
2. If FI_EMA_8 < FI_EMA_13 → signal is "risk-on" (fi_max_alloc).
3. If FI_EMA_8 and FI_EMA_13 are nearly equal (difference within tolerance) → signal is "neutral"
(allocation is the average of fi_max_alloc and fi_min_alloc).
4. If FI_EMA_8 > FI_EMA_13:
- Compute the slope of FI_EMA_8 (current minus previous value).
- If the slope > slope_threshold, signal "risk-off" (fi_min_alloc).
- Otherwise, signal "neutral" (midpoint allocation).
5. Once a risk-off signal is triggered, it will persist until the slope flattens to near zero.
Returns:
target_alloc: the target allocation.
signal_label: a string label ("risk-on", "neutral", or "risk-off").
"""
fi_8 = macro_data["FI_EMA_8"].asof(current_date)
fi_13 = macro_data["FI_EMA_13"].asof(current_date)
if pd.isna(fi_8) or pd.isna(fi_13):
return fi_max_alloc, "risk-on"
# Compute slope of FI_EMA_8
available_dates = macro_data.loc[:current_date].index
if len(available_dates) < 2:
slope = 0
else:
prev_date = available_dates[-2]
fi_8_prev = macro_data["FI_EMA_8"].asof(prev_date)
slope = fi_8 - fi_8_prev
if fi_8 < fi_13:
return fi_max_alloc, "risk-on"
elif abs(fi_8 - fi_13) < tol:
return (fi_max_alloc + fi_min_alloc) / 2, "neutral"
else: # fi_8 > fi_13
if slope > slope_threshold:
return fi_min_alloc, "risk-off"
else:
return (fi_max_alloc + fi_min_alloc) / 2, "neutral"
###############################################
# 11. Simulation: VIX-based Strategy with Signal Logging
###############################################
def simulate_strategy(prices, macro_data, eq_tickers, fi_tickers, alts_tickers,
initial_aum, start_date, end_date,
rebalance_period, rebalance_ratio,
lookback_period, metric_type,
internal_rebalance_ratios,
macro_max_alloc=1.0, macro_min_alloc=0.6,
macro_overlay_frequency=10):
"""
Runs the simulation with:
- Macro overlay adjustments every 'macro_overlay_frequency' days.
- Monthly momentum rebalancing.
Only logs days on which a macro overlay adjustment or monthly rebalance occurs.
Also computes and logs:
• VIX signal (and target allocation)
• Composite macro signal (and target allocation)
• FI signal (using the refined 8-day vs. 13-day EMA approach)
"""
vix_target_alloc = macro_max_alloc # Default to max allocation
vix_signal = "risk-on" # Default to risk-on
macro_target_alloc = macro_max_alloc
macro_signal = "risk-on"
fi_target_alloc = macro_max_alloc
fi_signal = "risk-on"
tickers = eq_tickers + fi_tickers + alts_tickers
monthly_dates = get_observation_dates(start_date, end_date, rebalance_period)
daily_dates = prices.index.sort_values()
# Prepare macro data: compute VIX EMA fields.
macro_data = macro_data.copy()
macro_data['VIX9D_EMA'] = macro_data["VIX9D"].ewm(span=5, adjust=False).mean()
macro_data["Mean"] = macro_data['VIX9D_EMA'].rolling(window=504, min_periods=1).mean()
macro_data["Std"] = macro_data['VIX9D_EMA'].rolling(window=504, min_periods=1).std()
# Precompute FI EMAs on the FI index ("LF98TRUU").
macro_data["FI_EMA_8"] = macro_data["LF98TRUU"].ewm(span=8, adjust=False).mean()
macro_data["FI_EMA_13"] = macro_data["LF98TRUU"].ewm(span=13, adjust=False).mean()
portfolio = initialize_portfolio(prices, start_date, tickers, initial_aum)
CASH = 0.0
# Initially use VIX signal (risk-on by default)
current_regime = 'risk-on'
target_alloc = macro_max_alloc # using VIX signal for now
results = []
prev_total_aum = initial_aum
last_macro_date = start_date
for current_date in daily_dates:
if current_date < start_date or current_date > end_date:
continue
daily_adjustment_note = "No adjustment"
cash_adjustment = 0
# Execute macro overlay adjustment on user-defined frequency.
if (current_date - last_macro_date).days >= macro_overlay_frequency:
# --- VIX Signal ---
try:
vix_ema = macro_data['VIX9D_EMA'].asof(current_date)
vix_mean = macro_data['Mean'].asof(current_date)
vix_std = macro_data['Std'].asof(current_date)
except Exception as e:
raise ValueError(f"Error retrieving VIX data for {current_date}: {e}")
vix_target_alloc = momentum_allocation(vix_ema, vix_mean, vix_std,
max_alloc=macro_max_alloc,
min_alloc=macro_min_alloc)
vix_signal = "risk-on" if vix_target_alloc == macro_max_alloc else "risk-off"
# --- Composite Macro Signal ---
macro_target_alloc, macro_signal = macro_signal_allocation_std(macro_data, current_date, macro_max_alloc, macro_min_alloc, rolling_window=252)
# --- FI Signal (Refined) ---
fi_target_alloc, fi_signal = compute_fi_target_allocation(macro_data, current_date, macro_max_alloc, macro_min_alloc, slope_threshold=0.01)
# For now, we use only the VIX signal for cash adjustment.
if vix_signal != current_regime:
current_regime = vix_signal
target_alloc = vix_target_alloc
mom_value = compute_portfolio_value(portfolio, prices, current_date)
total_investment = mom_value + CASH
desired_mom_value = target_alloc * total_investment
if mom_value > desired_mom_value:
excess = mom_value - desired_mom_value
cash_adjustment = excess # positive means cash raised.
for ticker in portfolio:
price = prices.loc[current_date, ticker]
ticker_value = portfolio[ticker] * price
sell_amount = (ticker_value / mom_value) * excess
qty_to_sell = sell_amount / price
portfolio[ticker] -= qty_to_sell
CASH += excess
daily_adjustment_note = f"Adj: Sold excess {excess:.2f} to CASH."
elif mom_value < desired_mom_value and CASH > 0:
shortage = desired_mom_value - mom_value
available = min(shortage, CASH)
cash_adjustment = -available # negative means cash used.
for ticker in portfolio:
price = prices.loc[current_date, ticker]
ticker_value = portfolio[ticker] * price
target_weight = ticker_value / mom_value if mom_value > 0 else 1/len(portfolio)
invest_amount = target_weight * available
qty_to_buy = invest_amount / price
portfolio[ticker] += qty_to_buy
CASH -= available
daily_adjustment_note = f"Adj: Bought using {available:.2f} CASH."
last_macro_date = current_date
# On monthly rebalancing dates, run full momentum rebalancing.
if current_date in monthly_dates:
sorted_tickers, ranks, metrics = rank_assets(prices, current_date, tickers, lookback_period, metric_type)
portfolio, trades, pre_rebalance_value = rebalance_portfolio(
portfolio, prices, current_date, tickers, sorted_tickers,
internal_rebalance_ratios, rebalance_ratio)
mom_value = compute_portfolio_value(portfolio, prices, current_date)
total_investment = mom_value + CASH
desired_mom_value = target_alloc * total_investment
if mom_value > desired_mom_value:
excess = mom_value - desired_mom_value
cash_adjustment = excess
for ticker in portfolio:
price = prices.loc[current_date, ticker]
ticker_value = portfolio[ticker] * price
sell_amount = (ticker_value / mom_value) * excess
qty_to_sell = sell_amount / price
portfolio[ticker] -= qty_to_sell
CASH += excess
daily_adjustment_note = f"Monthly: Sold excess {excess:.2f} to CASH."
elif mom_value < desired_mom_value and CASH > 0:
shortage = desired_mom_value - mom_value
available = min(shortage, CASH)
cash_adjustment = -available
for ticker in portfolio:
price = prices.loc[current_date, ticker]
ticker_value = portfolio[ticker] * price
target_weight = ticker_value / mom_value if mom_value > 0 else 1/len(portfolio)
invest_amount = target_weight * available
qty_to_buy = invest_amount / price
portfolio[ticker] += qty_to_buy
CASH -= available
daily_adjustment_note = f"Monthly: Bought using {available:.2f} CASH."
else:
daily_adjustment_note = "Monthly: No cash adjustment needed."
mom_value = compute_portfolio_value(portfolio, prices, current_date)
total_aum = mom_value + CASH
ret = (total_aum - prev_total_aum) / prev_total_aum
prev_total_aum = total_aum
row = {
'Date': current_date,
'Momentum AUM': mom_value,
'CASH': CASH,
'Total AUM': total_aum,
'Current Regime (VIX)': current_regime,
'VIX Target': vix_target_alloc,
'VIX Signal': vix_signal,
'Macro Target': macro_target_alloc,
'Macro Signal': macro_signal,
'FI Target': fi_target_alloc,
'FI Signal': fi_signal,
'Target Momentum Alloc': target_alloc,
'Adjustment Note': daily_adjustment_note,
'Cash Adjustment': cash_adjustment,
'Return': ret,
'Event': 'Monthly Rebalance'
}
for ticker in tickers:
price = prices.loc[current_date, ticker]
qty = portfolio[ticker]
notional = qty * price
row[f'qty_{ticker}'] = qty
row[f'notional_{ticker}'] = notional
row[f'weight_{ticker}'] = (notional / mom_value) if mom_value > 0 else np.nan
row[f'rank_{ticker}'] = ranks.get(ticker, np.nan)
row[f'metric_{ticker}'] = metrics.get(ticker, np.nan)
row[f'trade_{ticker}'] = trades.get(ticker, 0)
results.append(row)
else:
# Only log days where macro overlay occurred
if (current_date - last_macro_date).days == 0:
current_mom_value = compute_portfolio_value(portfolio, prices, current_date)
total_aum = current_mom_value + CASH
row = {
'Date': current_date,
'Momentum AUM': current_mom_value,
'CASH': CASH,
'Total AUM': total_aum,
'Current Regime (VIX)': current_regime,
'VIX Target': vix_target_alloc,
'VIX Signal': vix_signal,
'Macro Target': macro_target_alloc,
'Macro Signal': macro_signal,
'FI Target': fi_target_alloc,
'FI Signal': fi_signal,
'Target Momentum Alloc': target_alloc,
'Adjustment Note': daily_adjustment_note,
'Cash Adjustment': cash_adjustment,
'Return': np.nan,
'Event': 'Macro Overlay'
}
for ticker in tickers:
price = prices.loc[current_date, ticker]
qty = portfolio[ticker]
notional = qty * price
row[f'qty_{ticker}'] = qty
row[f'notional_{ticker}'] = notional
row[f'weight_{ticker}'] = (notional / current_mom_value) if current_mom_value > 0 else np.nan
row[f'rank_{ticker}'] = np.nan
row[f'metric_{ticker}'] = np.nan
row[f'trade_{ticker}'] = 0
results.append(row)
result_df = pd.DataFrame(results)
result_df.set_index('Date', inplace=True)
return result_df
###############################################
# 12. Main – Example Usage
###############################################
if __name__ == '__main__':
# Define asset tickers.
eq_tickers = ['SPY US Equity']
fi_tickers = ['TLT US Equity', 'HYG US Equity']
alts_tickers = ['GLD US Equity', 'SHV US Equity', 'VNQ US Equity']
initial_aum = 100e6 # 100 million
start_date = pd.to_datetime('2012-01-01')
end_date = pd.to_datetime('2025-02-01')
# Monthly rebalancing period.
rebalance_period = 1 # monthly
rebalance_ratio = 0.2 # 20% of current momentum AUM rebalanced each period
lookback_period = 6 # 6-month lookback period for momentum
metric_type = 'simple'
internal_rebalance_ratios = [0.7, 0.3, 0, 0, -0.3, -0.7]
# File paths (adjust these to your environment).
price_filepath = r"\\asiapac.nom\data\MUM\IWM\India_IWM_IPAS\Reet\Momentum Strategy\Codes\Historic Prices.xlsx"
macro_filepath = r"\\asiapac.nom\data\MUM\IWM\India_IWM_IPAS\Reet\Momentum Strategy\Momentum Strategy Overlay Data.xlsx"
prices = load_price_data(price_filepath)
macro_data = load_macro_data(macro_filepath)
# Set the macro overlay frequency in days (e.g., 30 days for overlay events)
macro_overlay_frequency = 30
# Run simulation.
result_df = simulate_strategy(prices, macro_data,
eq_tickers, fi_tickers, alts_tickers,
initial_aum, start_date, end_date,
rebalance_period, rebalance_ratio,
lookback_period, metric_type,
internal_rebalance_ratios,
macro_max_alloc=1.0, macro_min_alloc=0.6,
macro_overlay_frequency=macro_overlay_frequency)
pd.set_option('display.float_format', lambda x: f'{x:,.2f}')
# Filter to show only rows for Macro Overlay and Monthly Rebalance events.
filtered_df = result_df[result_df['Event'].isin(['Macro Overlay', 'Monthly Rebalance'])]
print(filtered_df)
Editor is loading...
Leave a Comment