Untitled
unknown
plain_text
13 days ago
9.3 kB
1
Indexable
import logging import pandas as pd import numpy as np import glob import re from datetime import datetime from numba import cuda, int32, float32 # ------------------------------- # 1) Configure Logging # ------------------------------- logger = logging.getLogger("GPUBacktest") logger.setLevel(logging.INFO) ch = logging.StreamHandler() logger.addHandler(ch) # ------------------------------- # 2) Hardcoded JSON-Like Conditions # ------------------------------- BULLISH_ENTRY_PCR = 0.7 # "Bullish": PCR > 0.7 => entry: PE (position_side = +1) HEAVILY_BULLISH_PCR = 1.2375 # "Heavily Bullish": PCR > 1.2375 => exit CE BEARISH_ENTRY_PCR = 1.1875 # "Bearish": PCR < 1.1875 => entry: CE (position_side = -1) HEAVILY_BEARISH_PCR = 0.65 # "Heavily Bearish": PCR < 0.65 => exit PE # ------------------------------- # 3) GPU Kernel: One Thread Per Expiry # ------------------------------- @cuda.jit def backtest_multiple_expiries_kernel( pcr_array, close_array, expiry_offsets, # shape (num_expiries, 2): [ [start_index, length], ... ] bull_entry, heavy_bull_exit, bear_entry, heavy_bear_exit, result_pnl # shape (num_expiries,) ): """ Each thread (thread_id = e) processes one expiry's slice of data. pcr_array, close_array: float32 arrays combining all expiries. expiry_offsets[e, 0] = start index for expiry e expiry_offsets[e, 1] = length (number of bars) for expiry e We do a time-sequential loop for that expiry (PCR/Close data). The final PnL is stored in result_pnl[e]. """ e = cuda.grid(1) # expiry_id if e >= result_pnl.size: return start_idx = expiry_offsets[e, 0] length = expiry_offsets[e, 1] in_position = False position_side = 0.0 # +1 => PE, -1 => CE entry_price = 0.0 total_pnl = 0.0 # Time-sequential loop over this expiry's slice for i in range(length): idx = start_idx + i pcr_val = pcr_array[idx] close_val = close_array[idx] # If in position, check exit conditions if in_position: # PE => exit if pcr_val < heavy_bear_exit if position_side > 0 and pcr_val < heavy_bear_exit: current_pnl = (close_val - entry_price) * position_side total_pnl += current_pnl in_position = False # CE => exit if pcr_val > heavy_bull_exit elif position_side < 0 and pcr_val > heavy_bull_exit: current_pnl = (close_val - entry_price) * position_side total_pnl += current_pnl in_position = False # If not in position, check entry signals if not in_position: # Bullish => PCR > bull_entry => open PE if pcr_val > bull_entry: in_position = True position_side = 1.0 entry_price = close_val # Bearish => PCR < bear_entry => open CE elif pcr_val < bear_entry: in_position = True position_side = -1.0 entry_price = close_val # End of loop, close if still in position if in_position and length > 0: last_close = close_array[start_idx + length - 1] final_pnl = (last_close - entry_price) * position_side total_pnl += final_pnl result_pnl[e] = total_pnl def main(): # --------------------------------------------------- # A) Read master data: PCR # --------------------------------------------------- logger.info("Reading PCR data...") pcr_df = pd.read_pickle("data/MAIN_NIFTY50_PCR.pkl") # columns: [Date, Time, PCR] # For demonstration, we won't use expiry_df here. We'll rely on file-naming for expiries. # But if you do have an expiry_df, you can cross-check or store statuses, etc. # --------------------------------------------------- # B) Gather OptionChain files for multiple expiries # --------------------------------------------------- pattern = "data/MAIN_NIFTY50_OPTIONS_*_OptionChain.pkl" files = glob.glob(pattern) logger.info(f"Found {len(files)} option chain files: {files}") # We want to parse the date from each filename: "YYYY_MM_DD" # We assume it ends with "..._YYYY_MM_DD_OptionChain.pkl" date_pattern = re.compile(r"NIFTY50_(\d{4})_(\d{2})_(\d{2})_OptionChain\.pkl$") # We'll collect merged data for all expiries # combined_pcr_list, combined_close_list = big arrays to store data for all # but we don't know total length up front, so we store partial results in lists, then np.concatenate pcr_segments = [] close_segments = [] expiry_offsets = [] expiry_labels = [] # track the string date for reporting current_start_idx = 0 # cumulative offset # Process each file for file_path in sorted(files): match = date_pattern.search(file_path) if not match: logger.warning(f"Skipping file (date parse failed): {file_path}") continue yyyy, mm, dd = match.groups() expiry_str = f"{yyyy}-{mm}-{dd}" # e.g. "2021-01-07" logger.info(f"Processing {file_path}, parsed expiry date: {expiry_str}") # 1) Load option data options_df = pd.read_pickle(file_path) # 2) Convert the expiry date to "DD-MM-YYYY" if your data uses that format dt = datetime.strptime(expiry_str, "%Y-%m-%d") expiry_ddmmyyyy = dt.strftime("%d-%m-%Y") # 3) Filter DataFrame for just that expiry df_exp = options_df[options_df['ExpiryDate'] == expiry_ddmmyyyy].copy() df_exp.sort_values(['Date','Time'], inplace=True) if df_exp.empty: logger.warning(f"No matching rows for expiry {expiry_ddmmyyyy}, skipping.") continue # 4) Merge with PCR on [Date, Time] df_merged = pd.merge(df_exp, pcr_df, on=['Date','Time'], how='left') df_merged.sort_values(['Date','Time'], inplace=True) df_merged.dropna(subset=['PCR'], inplace=True) if df_merged.empty: logger.warning(f"After merging with PCR, no data remains for {expiry_str}, skipping.") continue pcr_vals = df_merged['PCR'].values.astype(np.float32) close_vals = df_merged['Close'].values.astype(np.float32) segment_len = len(pcr_vals) # 5) Store segment arrays pcr_segments.append(pcr_vals) close_segments.append(close_vals) # Build offset info expiry_offsets.append((current_start_idx, segment_len)) expiry_labels.append(expiry_str) current_start_idx += segment_len # If no valid data found at all, bail out num_expiries = len(expiry_offsets) if num_expiries == 0: logger.info("No valid expiry data found. Exiting.") return # --------------------------------------------------- # C) Combine all data into big arrays # --------------------------------------------------- logger.info("Combining data for all expiries into single arrays...") combined_pcr = np.concatenate(pcr_segments) # shape (sum_of_lengths,) combined_close = np.concatenate(close_segments) # same shape offsets_np = np.array(expiry_offsets, dtype=np.int32) # shape (num_expiries, 2) # --------------------------------------------------- # D) Copy to GPU # --------------------------------------------------- logger.info(f"Total bars across all {num_expiries} expiries: {combined_pcr.shape[0]}") pcr_gpu = cuda.to_device(combined_pcr) close_gpu = cuda.to_device(combined_close) offsets_gpu = cuda.to_device(offsets_np) # We'll have 1 thread per expiry => result array of length num_expiries result_gpu = cuda.device_array(num_expiries, dtype=np.float32) # --------------------------------------------------- # E) Launch the kernel: one thread per expiry # --------------------------------------------------- threads_per_block = 128 blocks = (num_expiries + threads_per_block - 1) // threads_per_block logger.info(f"Launching kernel with {blocks} blocks, {threads_per_block} threads/block, for {num_expiries} expiries.") backtest_multiple_expiries_kernel[blocks, threads_per_block]( pcr_gpu, close_gpu, offsets_gpu, BULLISH_ENTRY_PCR, HEAVILY_BULLISH_PCR, BEARISH_ENTRY_PCR, HEAVILY_BEARISH_PCR, result_gpu ) cuda.synchronize() # --------------------------------------------------- # F) Retrieve results # --------------------------------------------------- result_cpu = result_gpu.copy_to_host() # shape (num_expiries,) # --------------------------------------------------- # G) Reporting # --------------------------------------------------- logger.info("Backtest results for each expiry:") for i, exp_date in enumerate(expiry_labels): pnl = result_cpu[i] logger.info(f" {exp_date} => PnL: {pnl:.2f}") if __name__ == "__main__": main()
Editor is loading...
Leave a Comment