Untitled

 avatar
unknown
plain_text
a month ago
26 kB
3
Indexable
import pandas as pd
import numpy as np
from datetime import datetime
from dateutil.relativedelta import relativedelta
from scipy.stats import zscore
import statsmodels.api as sm
from statsmodels.regression.rolling import RollingOLS

###############################################
# 1. Data Loading (Filtering Out Weekends)
###############################################

def load_price_data(filepath):
    """
    Load historical prices from an Excel file.
    Assumes that the first column is dates and the remaining columns are tickers.
    Removes weekend data.
    """
    df = pd.read_excel(filepath, index_col=0)
    df.index = pd.to_datetime(df.index)
    df = df.sort_index()
    # Filter out weekends (Saturday=5, Sunday=6)
    df = df[df.index.dayofweek < 5]
    return df

def load_macro_data(filepath):
    """
    Load macro indicators from an Excel file.
    Loads VIX and VIX3M from 'Eq' sheet, LF98TRUU Index from 'FI' sheet,
    and other macro indicators from 'Macro' sheet.
    Removes weekend data.
    """
    # VIX data
    vix_data = pd.read_excel(filepath, sheet_name='Eq', index_col=0, parse_dates=True, usecols=[0, 4, 5])
    vix_data.columns = ['VIX', 'VIX3M']
    vix_data = vix_data[vix_data.index.dayofweek < 5]
    
    # FI data
    cdx_data = pd.read_excel(filepath, sheet_name='FI', index_col=0, parse_dates=True, usecols=[0, 2], skiprows=1)
    cdx_data.columns = ['LF98TRUU']
    cdx_data = cdx_data[cdx_data.index.dayofweek < 5]
    
    # Macro data (assumed to include other indicators)
    macro_data = pd.read_excel(filepath, sheet_name='Macro', index_col=0, parse_dates=True, usecols=range(8), skiprows=1)
    macro_data = macro_data[macro_data.index.dayofweek < 5]
    
    combined_data = pd.concat([vix_data, cdx_data, macro_data], axis=1)
    combined_data = combined_data.fillna(method='ffill').fillna(method='bfill')
    combined_data = combined_data.sort_index()
    return combined_data

###############################################
# 2. Helper: Observation Dates (Monthly)
###############################################

def get_observation_dates(prices, start_date, end_date, rebalance_period):
    dates = []
    current_date = start_date
    while current_date < end_date:
        candidate_date = (current_date + relativedelta(months=rebalance_period)).replace(day=1)
        while candidate_date not in prices.index:
            candidate_date += pd.Timedelta(days=1)
            if candidate_date.month != (current_date + relativedelta(months=rebalance_period)).month:
                candidate_date = None
                break
        if candidate_date is None or candidate_date > end_date:
            break
        dates.append(candidate_date)
        current_date = candidate_date
    return dates

###############################################
# 3. Portfolio Initialization
###############################################

def initialize_portfolio(prices, date, tickers, initial_aum):
    portfolio = {}
    mask = prices.index < date
    prev_date = prices.index[mask][-1]
    allocation = initial_aum / len(tickers)
    for ticker in tickers:
        price = prices.loc[prev_date, ticker]
        portfolio[ticker] = allocation / price
    return portfolio

###############################################
# 4. Lookback Metric Computation
###############################################

def compute_lookback_metric(prices, current_date, ticker, lookback_period, metric_type='simple'):
    prices = prices.sort_index()
    mask = prices.index < current_date
    prev_date = prices.index[mask][-1]
    lookback_date = prev_date - relativedelta(months=lookback_period)
   
    current_price = prices[ticker].asof(prev_date)
    lookback_price = prices[ticker].asof(lookback_date)

    if pd.isna(current_price) or pd.isna(lookback_price):
        raise ValueError(f"Missing price data for {ticker} on {prev_date} or {lookback_date}.")
    if metric_type == 'simple':
        metric = (current_price / lookback_price) - 1
    elif metric_type == 'sma':
        window = prices[ticker].loc[lookback_date:current_date]
        if window.empty:
            raise ValueError(f"No price data for {ticker} between {lookback_date} and {current_date}.")
        sma = window.mean()
        metric = (current_price - sma) / sma
    else:
        raise ValueError("Invalid metric type. Choose 'simple' or 'sma'.")
    return metric

###############################################
# 5. Ranking Assets by Momentum
###############################################

def rank_assets(prices, current_date, tickers, lookback_period, metric_type):
    metrics = {}
    for ticker in tickers:
        metric = compute_lookback_metric(prices, current_date, ticker, lookback_period, metric_type)
        metrics[ticker] = metric
    sorted_tickers = sorted(metrics, key=metrics.get, reverse=True)
    ranks = {ticker: rank+1 for rank, ticker in enumerate(sorted_tickers)}
    return sorted_tickers, ranks, metrics

###############################################
# 6. Compute Current Portfolio Value
###############################################

def compute_portfolio_value(portfolio, prices, current_date):
    value = 0
    for ticker, quantity in portfolio.items():
        price = prices.loc[current_date, ticker]
        value += quantity * price
    return value

###############################################
# 7. Rebalance the Momentum Portfolio
###############################################

def rebalance_portfolio(portfolio, prices, current_date, tickers, sorted_tickers,
                        internal_rebalance_ratios, rebalance_ratio):
    mask = prices.index < current_date
    prev_date = prices.index[mask][-1]
    prev_prices = prices.loc[prev_date]
    curr_prices = prices.loc[current_date]

    portfolio_value = sum(portfolio[ticker] * prev_prices[ticker] for ticker in tickers)
    rebalance_amount = portfolio_value * rebalance_ratio

    target_trades = {ticker: rebalance_amount * internal_rebalance_ratios[i]
                     for i, ticker in enumerate(sorted_tickers)}

    total_sold = 0
    actual_trades = {}

    for ticker, target_trade in target_trades.items():
        if target_trade < 0:
            available_notional = portfolio[ticker] * curr_prices[ticker]
            sell_target = abs(target_trade)
            actual_sell = min(available_notional, sell_target)
            actual_trades[ticker] = -actual_sell
            total_sold += actual_sell
        else:
            actual_trades[ticker] = 0

    total_buy_target = sum(t for t in target_trades.values() if t > 0)
    if total_buy_target > 0:
        for ticker, target_trade in target_trades.items():
            if target_trade > 0:
                proportion = target_trade / total_buy_target
                buy_amount = total_sold * proportion
                actual_trades[ticker] = buy_amount

    new_portfolio = portfolio.copy()
    for ticker, trade_notional in actual_trades.items():
        execution_price = curr_prices[ticker]
        qty_change = trade_notional / execution_price
        new_portfolio[ticker] += qty_change

    return new_portfolio, actual_trades, portfolio_value

def adjust_overweight(portfolio, prices, current_date, sorted_tickers, threshold=0.70):
    mask = prices.index < current_date
    prev_date = prices.index[mask][-1]
    prev_prices = prices.loc[prev_date]
    curr_prices = prices.loc[current_date]

    portfolio_value = sum(portfolio[ticker] * prev_prices[ticker] for ticker in portfolio)
    weights = {ticker: (portfolio[ticker] * prev_prices[ticker]) / portfolio_value
               for ticker in portfolio}

    new_portfolio = portfolio.copy()

    for overweight in portfolio:
        if weights[overweight] > threshold:
            extra_weight = weights[overweight] - threshold
            extra_value = extra_weight * portfolio_value
            execution_price_over = curr_prices[overweight]
            qty_reduce = extra_value / execution_price_over
            new_portfolio[overweight] -= qty_reduce

            remaining_value = extra_value

            for candidate in sorted_tickers:
                if candidate == overweight:
                    continue
                candidate_value = new_portfolio[candidate] * curr_prices[candidate]
                candidate_weight = candidate_value / portfolio_value
                if candidate_weight < threshold:
                    capacity = (threshold - candidate_weight) * portfolio_value
                    allocation = min(remaining_value, capacity)
                    qty_add = allocation / curr_prices[candidate]
                    new_portfolio[candidate] += qty_add
                    remaining_value -= allocation
                    if remaining_value <= 0:
                        break
    return new_portfolio

###############################################
# 8. New Helper: Rolling Regression-Based SPY Return Signal
###############################################

def compute_regression_signal_rolling(current_date, macro_data, prices, window_size=100, slope_window=5, threshold=0.0):
    """
    Computes the next-day SPY return signal using a rolling regression.
    
    Steps:
      1. Compute the TS as (VIX - VIX3M) and its rolling slope over `slope_window` days.
      2. Merge the slope with next-day SPY returns.
      3. Use RollingOLS over the most recent `window_size` observations to estimate:
             SPY_return_next = alpha + beta_pos * max(slope,0) + beta_neg * min(slope,0) + error
      4. Compute the current day's slope (using the same method) and then predict the next-day return.
      5. Return the predicted return and a signal: 'risk-off' if below threshold, else 'risk-on'.
    """
    # Create a copy and compute TS
    temp = macro_data.copy()
    temp['TS'] = temp['VIX'] - temp['VIX3M']
    # Rolling slope calculation
    temp['slope'] = temp['TS'].rolling(window=slope_window).apply(
        lambda x: np.polyfit(range(len(x)), x, 1)[0] if len(x) == slope_window else np.nan,
        raw=True
    )
    
    # Compute next-day SPY returns from prices
    spy_prices = prices['SPY US Equity']
    spy_returns = spy_prices.pct_change().shift(-1)  # next-day returns
    
    # Merge slope and next-day returns
    df = temp[['slope']].copy()
    df['SPY_return_next'] = spy_returns
    df = df.dropna(subset=['slope', 'SPY_return_next'])
    
    # Use only data up to current_date
    df = df.loc[:current_date]
    if len(df) < window_size:
        return None, None  # Not enough data
    
    # Take the most recent window_size observations
    df_window = df.iloc[-window_size:].copy()
    # Split slope into positive and negative parts
    df_window['slope_pos'] = df_window['slope'].apply(lambda x: x if x > 0 else 0)
    df_window['slope_neg'] = df_window['slope'].apply(lambda x: x if x < 0 else 0)
    
    X = df_window[['slope_pos', 'slope_neg']]
    X = sm.add_constant(X)
    y = df_window['SPY_return_next']
    
    # RollingOLS with window=window_size (here the whole window is used)
    model = RollingOLS(endog=y, exog=X, window=window_size)
    rres = model.fit()
    params = rres.params.iloc[-1]  # most recent estimates
    
    # Compute current day's slope
    current_date_eff = current_date if current_date in temp.index else temp.index[-1]
    idx = list(temp.index).index(current_date_eff)
    if idx < slope_window - 1:
        return None, None  # Not enough data for current slope
    window_dates = temp.index[idx - slope_window + 1: idx + 1]
    x_current = np.arange(slope_window)
    y_current = temp.loc[window_dates, 'TS'].values
    current_slope = np.polyfit(x_current, y_current, 1)[0]
    current_slope_pos = current_slope if current_slope > 0 else 0
    current_slope_neg = current_slope if current_slope < 0 else 0
    X_current = np.array([1, current_slope_pos, current_slope_neg])
    
    predicted_return = np.dot(params, X_current)
    signal = 'risk-off' if predicted_return < threshold else 'risk-on'
    return predicted_return, signal

###############################################
# 9. Helper Functions for Cash Management
###############################################

def invest_cash_into_portfolio(portfolio, prices, current_date, cash_qty, cash_ticker):
    """
    When switching from risk-off to risk-on, sell the cash instrument (e.g. SHV)
    and reinvest its proceeds into the portfolio using previous-day weights.
    """
    cash_price = prices.loc[current_date, cash_ticker]
    cash_available = cash_qty * cash_price
    if cash_available <= 0:
        return portfolio, cash_qty, f"No {cash_ticker} to reinvest."
    mask = prices.index < current_date
    prev_date = prices.index[mask][-1]
    mom_value = compute_portfolio_value(portfolio, prices, prev_date)
    new_portfolio = portfolio.copy()
    for ticker in portfolio:
        prev_price = prices.loc[prev_date, ticker]
        weight = (portfolio[ticker] * prev_price) / mom_value if mom_value > 0 else 1/len(portfolio)
        invest_amount = weight * cash_available
        price_today = prices.loc[current_date, ticker]
        qty_to_buy = invest_amount / price_today
        new_portfolio[ticker] += qty_to_buy
    return new_portfolio, 0.0, f"Reinvested {cash_available:,.2f} from {cash_ticker}"

def allocate_cash_from_portfolio(portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker):
    """
    When switching from risk-on to risk-off (or when target_alloc changes),
    sell a proportional amount of portfolio securities to build a cash position
    in the cash_ticker.
    """
    mask = prices.index < current_date
    prev_date = prices.index[mask][-1]
    curr_value = compute_portfolio_value(portfolio, prices, prev_date)
    cash_price = prices.loc[current_date, cash_ticker]
    cash_equiv = cash_qty * cash_price
    total_aum = curr_value + cash_equiv
    desired_value = target_alloc * total_aum
    new_portfolio = portfolio.copy()
    new_cash_qty = cash_qty
    note = ""
    if curr_value > desired_value:
        excess = curr_value - desired_value
        for ticker in portfolio:
            price = prices.loc[current_date, ticker]
            ticker_value = portfolio[ticker] * price
            sell_amount = (ticker_value / curr_value) * excess
            qty_to_sell = sell_amount / price
            new_portfolio[ticker] -= qty_to_sell
        new_cash_qty += excess / cash_price
        note = f"Raised {excess:,.2f} into {cash_ticker}"
    elif curr_value < desired_value and cash_qty > 0:
        shortage = desired_value - curr_value
        available_cash = min(shortage, cash_equiv)
        for ticker in portfolio:
            price = prices.loc[current_date, ticker]
            ticker_value = portfolio[ticker] * price
            weight = (ticker_value / curr_value) if curr_value > 0 else 1/len(portfolio)
            invest_amount = weight * available_cash
            qty_to_buy = invest_amount / price
            new_portfolio[ticker] += qty_to_buy
        new_cash_qty -= available_cash / cash_price
        note = f"Deployed {available_cash:,.2f} from {cash_ticker} into portfolio"
    return new_portfolio, new_cash_qty, note

###############################################
# 10. Simulation: Strategy with Rolling Regression Signal
###############################################

def simulate_strategy(prices, macro_data, eq_tickers, fi_tickers, alts_tickers,
                      initial_aum, start_date, end_date,
                      rebalance_period, rebalance_ratio,
                      lookback_period, metric_type,
                      internal_rebalance_ratios,
                      cash_ticker='SHV US Equity',
                      macro_max_alloc=1.0, macro_min_alloc=0.6,
                      spy_return_threshold=0.0):
    """
    Simulation using a rolling regression-based signal.
      - Computes the regression signal from the rolling regression on VIX term structure slopes.
      - If predicted SPY return is below spy_return_threshold, signal is 'risk-off' (target allocation = macro_min_alloc).
      - Otherwise, signal is 'risk-on' (target allocation = 1.0).
      - The simulation includes monthly rebalancing and cash management.
    """
    # Define tickers for momentum (exclude cash_ticker)
    all_tickers = eq_tickers + fi_tickers + alts_tickers
    momentum_tickers = [t for t in all_tickers if t != cash_ticker]
   
    monthly_dates = get_observation_dates(prices, start_date, end_date, rebalance_period)
    daily_dates = prices.index.sort_values()
    daily_dates = daily_dates[(daily_dates >= start_date) & (daily_dates <= end_date)]
   
    macro_data = macro_data.copy()
   
    # Initialize portfolio (momentum securities only) and cash (in cash_ticker)
    portfolio = initialize_portfolio(prices, start_date, momentum_tickers, initial_aum)
    cash_qty = 0.0
    current_regime = 'risk-on'
    target_alloc = 1.0
    previous_regime = current_regime
    previous_target_alloc = target_alloc
    prev_total_aum = initial_aum

    results = []

    for current_date in daily_dates:
        daily_note = "No adjustment"
       
        # --- Obtain rolling regression signal ---
        predicted_spy_return, regression_signal = compute_regression_signal_rolling(
            current_date, macro_data, prices, window_size=100, slope_window=5, threshold=spy_return_threshold
        )
       
        # --- For logging: also capture VIX values and spread ---
        vix_1m = macro_data['VIX'].asof(current_date)
        vix_3m = macro_data['VIX3M'].asof(current_date)
        vix_spread = vix_1m - vix_3m if (pd.notna(vix_1m) and pd.notna(vix_3m)) else np.nan

        # --- Determine portfolio value and SPY/HYG weights ---
        mask = prices.index < current_date
        prev_date = prices.index[mask][-1]
        mom_value = compute_portfolio_value(portfolio, prices, prev_date)
        spy_weight = (portfolio.get('SPY US Equity', 0) * prices.loc[prev_date, 'SPY US Equity']) / mom_value if mom_value > 0 else 0
        hyg_weight = (portfolio.get('HYG US Equity', 0) * prices.loc[prev_date, 'HYG US Equity']) / mom_value if mom_value > 0 else 0

        # --- Determine regime based solely on regression signal ---
        if regression_signal == 'risk-off':
            # Safeguard: if combined SPY+HYG weight is too low, force risk-on.
            if (spy_weight + hyg_weight) < 0.40:
                current_regime = 'risk-on'
                target_alloc = 1.0
                daily_note = "Forced regime to risk-on (SPY+HYG weight < 40)"
            else:
                current_regime = 'risk-off'
                target_alloc = macro_min_alloc  # e.g., 0.6 for risk-off
        else:
            current_regime = 'risk-on'
            target_alloc = 1.0

        # --- Cash rebalancing logic ---
        if (previous_regime != current_regime) or (current_regime == 'risk-off' and target_alloc != previous_target_alloc):
            if previous_regime == 'risk-off' and current_regime == 'risk-on' and cash_qty > 0:
                portfolio, cash_qty, note_update = invest_cash_into_portfolio(portfolio, prices, current_date, cash_qty, cash_ticker)
                daily_note += " | " + note_update
            elif (previous_regime == 'risk-on' and current_regime == 'risk-off') or (current_regime == 'risk-off' and target_alloc != previous_target_alloc):
                portfolio, cash_qty, note_update = allocate_cash_from_portfolio(portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker)
                daily_note += " | " + note_update

        previous_regime = current_regime
        previous_target_alloc = target_alloc

        # --- Monthly Rebalancing ---
        if current_date in monthly_dates:
            sorted_tickers, ranks, metrics = rank_assets(prices, current_date, momentum_tickers, lookback_period, metric_type)
            temp_portfolio, trades, _ = rebalance_portfolio(portfolio, prices, current_date, momentum_tickers, sorted_tickers, internal_rebalance_ratios, rebalance_ratio)
            temp_value = compute_portfolio_value(temp_portfolio, prices, current_date)
            spy_temp = temp_portfolio.get('SPY US Equity', 0) * prices.loc[current_date, 'SPY US Equity']
            hyg_temp = temp_portfolio.get('HYG US Equity', 0) * prices.loc[current_date, 'HYG US Equity']
            combined_weight = (spy_temp + hyg_temp) / temp_value if temp_value > 0 else 0

            if (current_regime == 'risk-off') and (combined_weight < 0.40):
                current_regime = 'risk-on'
                target_alloc = 1.0
                daily_note += " | Monthly: Forced risk-on (SPY+HYG weight < 40)"
                total_aum = compute_portfolio_value(portfolio, prices, current_date) + cash_qty * prices.loc[current_date, cash_ticker]
                simulated_value = temp_value
                new_portfolio = {}
                for ticker in temp_portfolio:
                    price = prices.loc[current_date, ticker]
                    simulated_weight = (temp_portfolio[ticker] * price) / simulated_value if simulated_value > 0 else 1/len(temp_portfolio)
                    new_qty = (total_aum * simulated_weight) / price
                    new_portfolio[ticker] = new_qty
                portfolio = new_portfolio
                cash_qty = 0
            else:
                portfolio = temp_portfolio
                curr_value = compute_portfolio_value(portfolio, prices, current_date)
                total_aum = curr_value + cash_qty * prices.loc[current_date, cash_ticker]
                desired_value = target_alloc * total_aum
                if curr_value > desired_value:
                    portfolio, cash_qty, note_update = allocate_cash_from_portfolio(portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker)
                    daily_note += " | Monthly: " + note_update
                elif curr_value < desired_value and cash_qty > 0:
                    portfolio, cash_qty, note_update = allocate_cash_from_portfolio(portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker)
                    daily_note += " | Monthly: " + note_update
                portfolio = adjust_overweight(portfolio, prices, current_date, sorted_tickers, threshold=0.70)

        # --- Update daily AUM calculation ---
        current_mom_value = compute_portfolio_value(portfolio, prices, current_date)
        cash_price = prices.loc[current_date, cash_ticker]
        cash_value = cash_qty * cash_price
        total_aum = current_mom_value + cash_value
        ret = (total_aum - prev_total_aum) / prev_total_aum if prev_total_aum > 0 else 0
        prev_total_aum = total_aum

        # --- Log results ---
        row = {
            'Date': current_date,
            'Momentum AUM': current_mom_value,
            'Cash Qty': cash_qty,
            'Cash Price': cash_price,
            'Cash Value': cash_value,
            'Total AUM': total_aum,
            'Current Regime': current_regime,
            'Target Alloc': target_alloc,
            'Regression Signal': regression_signal,
            'Predicted SPY Return': predicted_spy_return,
            'VIX 1M': vix_1m,
            'VIX 3M': vix_3m,
            'VIX Spread': vix_spread,
            'Adjustment Note': daily_note,
            'Return': ret,
            'Event': 'Monthly Rebalance' if current_date in monthly_dates else 'Daily Check'
        }

        for ticker in momentum_tickers:
            price = prices.loc[current_date, ticker]
            qty = portfolio[ticker]
            notional = qty * price
            row[f'qty_{ticker}'] = qty
            row[f'notional_{ticker}'] = notional
            row[f'weight_{ticker}'] = (notional / current_mom_value) if current_mom_value > 0 else np.nan

        results.append(row)

    result_df = pd.DataFrame(results)
    result_df.set_index('Date', inplace=True)
    return result_df

###############################################
# 11. Main – Example Usage
###############################################

if __name__ == '__main__':
    # Define asset tickers.
    eq_tickers   = ['SPY US Equity']
    fi_tickers   = ['TLT US Equity', 'HYG US Equity']
    alts_tickers = ['GLD US Equity', 'IGSB US Equity']  # Do not include cash_ticker here
   
    # Define the cash ticker (for short-term cash management)
    cash_ticker = 'SHV US Equity'
   
    initial_aum = 100e6   # 100 million
    start_date  = pd.to_datetime('2008-01-01')
    end_date    = pd.to_datetime('2025-02-01')
    rebalance_period = 1      # monthly rebalancing
    rebalance_ratio  = 0.2     # portion of portfolio rebalanced each month
    lookback_period  = 6      
    metric_type      = 'simple'
   
    internal_rebalance_ratios = [0.8, 0.2, 0, -0.2, -0.8]
   
    # File paths (adjust these to your environment).
    price_filepath = r"\\asiapac.nom\data\MUM\IWM\India_IWM_IPAS\Reet\Momentum Strategy\Codes\Historic Prices.xlsx"
    macro_filepath = r"\\asiapac.nom\data\MUM\IWM\India_IWM_IPAS\Reet\Momentum Strategy\Momentum Strategy Overlay Data.xlsx"

    prices = load_price_data(price_filepath)
    macro_data = load_macro_data(macro_filepath)
   
    # Run simulation.
    result_df = simulate_strategy(prices, macro_data,
                                  eq_tickers, fi_tickers, alts_tickers,
                                  initial_aum, start_date, end_date,
                                  rebalance_period, rebalance_ratio,
                                  lookback_period, metric_type,
                                  internal_rebalance_ratios,
                                  cash_ticker=cash_ticker,
                                  macro_max_alloc=1.0, macro_min_alloc=0.6,
                                  spy_return_threshold=0.0)
   
    pd.set_option('display.float_format', lambda x: f'{x:,.2f}')
    print(result_df[['Total AUM', 'Momentum AUM', 'Cash Qty', 'Cash Price', 'Cash Value']].tail())
Editor is loading...
Leave a Comment