Untitled
import logging import pandas as pd import numpy as np from numba import cuda # ------------------------------- # 1) Configure Logging # ------------------------------- logger = logging.getLogger("GPUBacktest") logger.setLevel(logging.INFO) ch = logging.StreamHandler() logger.addHandler(ch) # ------------------------------- # 2) Hardcoded JSON Conditions # (Simplified from your example) # ------------------------------- BULLISH_ENTRY_PCR = 0.7 # "Bullish": PCR > 0.7 => entry: PE HEAVILY_BULLISH_PCR = 1.2375 # "Heavily Bullish": PCR > 1.2375 => exit CE BEARISH_ENTRY_PCR = 1.1875 # "Bearish": PCR < 1.1875 => entry: CE HEAVILY_BEARISH_PCR = 0.65 # "Heavily Bearish": PCR < 0.65 => exit PE UNIVERSAL_STOP_LOSS = -4.0 # universal SL UNIVERSAL_TARGET = 10.0 # universal target # We map "PE" to position_side = +1, "CE" to position_side = -1 # So if we are in a PE, that means we want to exit if "Heavily Bearish". # If we are in a CE, that means we want to exit if "Heavily Bullish". # ------------------------------- # 3) GPU Kernel # One thread = one parameter variant # (Here, we only demonstrate one expiry for simplicity.) # ------------------------------- @cuda.jit def backtest_single_expiry_kernel(pcr_array, option_close_array, # JSON threshold constants bull_entry, heavy_bull_exit, bear_entry, heavy_bear_exit, universal_sl, universal_target, # output pnl_array): """ pcr_array, option_close_array: float32 arrays representing a single expiry's time series. We assume both have the same length (N bars). Each thread handles one set of parameters (if we had multiple variants). Here, we show a minimal example: we can pass the thresholds as scalars, or we can store them in an array if we have multiple variants. For demonstration, each thread will do the SAME thresholds. If you have multiple variants, you typically store each variant's thresholds in arrays and index them with thread_id. The kernel loops over the time dimension sequentially to handle open/close logic. We allow only one position at a time. """ idx = cuda.grid(1) # For demonstration, assume we only have 1 thread or a small number of threads # If idx >= something, return. But let's keep it simple: if idx == 0: n = pcr_array.size # number of bars in_position = False position_side = 0.0 # +1 = PE, -1 = CE entry_price = 0.0 total_pnl = 0.0 # Loop over each bar/time in this single expiry for i in range(n): pcr_val = pcr_array[i] close_px = option_close_array[i] # If in position, check universal SL/target if in_position: current_pnl = (close_px - entry_price) * position_side # universal SL or target if current_pnl <= universal_sl or current_pnl >= universal_target: total_pnl += current_pnl in_position = False # If still in position, check exit signals # - If in PE (+1), exit if "Heavily Bearish" => PCR < heavy_bear_exit # - If in CE (-1), exit if "Heavily Bullish" => PCR > heavy_bull_exit if in_position: # PE => exit if pcr_val < heavy_bear_exit if position_side > 0 and pcr_val < heavy_bear_exit: current_pnl = (close_px - entry_price) * position_side total_pnl += current_pnl in_position = False # CE => exit if pcr_val > heavy_bull_exit elif position_side < 0 and pcr_val > heavy_bull_exit: current_pnl = (close_px - entry_price) * position_side total_pnl += current_pnl in_position = False # If not in position, check entry signals # - Bullish => PCR > bull_entry => open PE # - Bearish => PCR < bear_entry => open CE if not in_position: if pcr_val > bull_entry: # open PE in_position = True position_side = 1.0 entry_price = close_px elif pcr_val < bear_entry: # open CE in_position = True position_side = -1.0 entry_price = close_px # At the end, if still in position, close at the last bar if in_position: last_close = option_close_array[n - 1] final_pnl = (last_close - entry_price) * position_side total_pnl += final_pnl pnl_array[0] = total_pnl # If you had multiple threads (variants), you'd do: # thread_id = cuda.grid(1) # load thresholds from arrays[thread_id] # store to pnl_array[thread_id] def main(): # -------------------------------------------------------- # A) Read the pickled data from disk # -------------------------------------------------------- logger.info("Reading pickle files...") pcr_df = pd.read_pickle("pcr_data.pkl") # (Date, Time, PCR) expiry_df = pd.read_pickle("expiry_data.pkl") # (Expiries, Status) options_df = pd.read_pickle("options_data.pkl") # (Date, Time, Open, ..., ExpiryDate) logger.info(f"PCR DF shape: {pcr_df.shape}, columns: {pcr_df.columns}") logger.info(f"Expiry DF shape: {expiry_df.shape}, columns: {expiry_df.columns}") logger.info(f"Options DF shape: {options_df.shape}, columns: {options_df.columns}") # -------------------------------------------------------- # B) Select the first expiry for demonstration # (once working, you can loop over all expiries) # -------------------------------------------------------- first_expiry = expiry_df.iloc[0]['Expiries'] # e.g. "2025-12-25" or similar logger.info(f"Selected expiry for test: {first_expiry}") # Filter options data for that expiry mask = (options_df['ExpiryDate'] == first_expiry) df_expiry = options_df[mask].copy() # Sort by Date, Time (important for time-series) df_expiry.sort_values(['Date','Time'], inplace=True) logger.info(f"Filtered option data shape: {df_expiry.shape}") # -------------------------------------------------------- # C) Merge with PCR Data on (Date, Time) # This ensures we have a PCR value for each row # -------------------------------------------------------- df_merged = pd.merge(df_expiry, pcr_df, on=['Date','Time'], how='left') df_merged.sort_values(['Date','Time'], inplace=True) # Now df_merged has columns: # [Date, Time, Open, High, Low, Close, Volume, OI, OptionType, # StrikePrice, Ticker, Delta, Close_Index, ExpiryDate, PCR] logger.info(f"Merged shape: {df_merged.shape}") # For the GPU backtest, we'll just need: # - pcr array # - close prices array # In a real strategy, you might need Delta or other columns. # Drop rows with missing PCR if it happens df_merged = df_merged.dropna(subset=['PCR']).reset_index(drop=True) n_rows = len(df_merged) # -------------------------------------------------------- # D) Prepare NumPy arrays for GPU # -------------------------------------------------------- pcr_np = df_merged['PCR'].values.astype(np.float32) close_np = df_merged['Close'].values.astype(np.float32) # Copy to GPU pcr_gpu = cuda.to_device(pcr_np) close_gpu = cuda.to_device(close_np) # We'll store the result in a small array of length 1 (for one variant) # If you had multiple variants, you'd create a larger array: length = #variants pnl_gpu = cuda.device_array(1, dtype=np.float32) # -------------------------------------------------------- # E) Launch the GPU kernel # -------------------------------------------------------- # We use 1 block, 1 thread for demonstration (since we only do 1 variant) threads_per_block = 1 blocks = 1 logger.info("Launching GPU kernel for single-expiry demonstration...") backtest_single_expiry_kernel[blocks, threads_per_block]( pcr_gpu, close_gpu, # JSON thresholds BULLISH_ENTRY_PCR, HEAVILY_BULLISH_PCR, BEARISH_ENTRY_PCR, HEAVILY_BEARISH_PCR, UNIVERSAL_STOP_LOSS, UNIVERSAL_TARGET, # Output pnl_gpu ) # Synchronize cuda.synchronize() # -------------------------------------------------------- # F) Retrieve the result # -------------------------------------------------------- pnl_result = pnl_gpu.copy_to_host()[0] logger.info(f"Backtest completed. Final PnL: {pnl_result:.2f}") # -------------------------------------------------------- # G) Next Steps # -------------------------------------------------------- # - If this works for the first expiry, you can loop over all expiries in expiry_df. # - If you have multiple variants (param sets), you can create bigger arrays for # your thresholds, launch more threads, and do "thread_id = cuda.grid(1)" # indexing inside the kernel. # - Ensure your memory usage and performance are tested for large data. if __name__ == "__main__": main()
Leave a Comment