Untitled
unknown
plain_text
10 months ago
9.4 kB
13
Indexable
import logging
import pandas as pd
import numpy as np
import glob
import re
from datetime import datetime
import time
from numba import cuda, int32, float32
# -------------------------------
# 1) Configure Logging
# -------------------------------
logger = logging.getLogger("GPUBacktest")
logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
logger.addHandler(ch)
# -------------------------------
# 2) Hardcoded JSON-Like Conditions
# -------------------------------
BULLISH_ENTRY_PCR = 0.7 # "Bullish": PCR > 0.7 => entry: PE (position_side = +1)
HEAVILY_BULLISH_PCR = 1.2375 # "Heavily Bullish": PCR > 1.2375 => exit CE
BEARISH_ENTRY_PCR = 1.1875 # "Bearish": PCR < 1.1875 => entry: CE (position_side = -1)
HEAVILY_BEARISH_PCR = 0.65 # "Heavily Bearish": PCR < 0.65 => exit PE
# -------------------------------
# 3) GPU Kernel: One Thread Per Expiry
# -------------------------------
@cuda.jit
def backtest_multiple_expiries_kernel(
pcr_array,
close_array,
expiry_offsets, # shape (num_expiries, 2): [ [start_index, length], ... ]
bull_entry, heavy_bull_exit,
bear_entry, heavy_bear_exit,
result_pnl # shape (num_expiries,)
):
"""
Each thread (thread_id = e) processes one expiry's slice of data.
pcr_array, close_array: float32 arrays combining all expiries.
expiry_offsets[e, 0] = start index for expiry e
expiry_offsets[e, 1] = length (number of bars) for expiry e
We do a time-sequential loop for that expiry (PCR/Close data).
The final PnL is stored in result_pnl[e].
"""
e = cuda.grid(1) # expiry_id
if e >= result_pnl.size:
return
start_idx = expiry_offsets[e, 0]
length = expiry_offsets[e, 1]
in_position = False
position_side = 0.0 # +1 => PE, -1 => CE
entry_price = 0.0
total_pnl = 0.0
# Time-sequential loop over this expiry's slice
for i in range(length):
idx = start_idx + i
pcr_val = pcr_array[idx]
close_val = close_array[idx]
# If in position, check exit conditions
if in_position:
# PE => exit if pcr_val < heavy_bear_exit
if position_side > 0 and pcr_val < heavy_bear_exit:
current_pnl = (close_val - entry_price) * position_side
total_pnl += current_pnl
in_position = False
# CE => exit if pcr_val > heavy_bull_exit
elif position_side < 0 and pcr_val > heavy_bull_exit:
current_pnl = (close_val - entry_price) * position_side
total_pnl += current_pnl
in_position = False
# If not in position, check entry signals
if not in_position:
# Bullish => PCR > bull_entry => open PE
if pcr_val > bull_entry:
in_position = True
position_side = 1.0
entry_price = close_val
# Bearish => PCR < bear_entry => open CE
elif pcr_val < bear_entry:
in_position = True
position_side = -1.0
entry_price = close_val
# End of loop, close if still in position
if in_position and length > 0:
last_close = close_array[start_idx + length - 1]
final_pnl = (last_close - entry_price) * position_side
total_pnl += final_pnl
result_pnl[e] = total_pnl
def main():
# ---------------------------------------------------
# A) Read master data: PCR
# ---------------------------------------------------
logger.info("Reading PCR data...")
pcr_df = pd.read_pickle("data/MAIN_NIFTY50_PCR.pkl") # columns: [Date, Time, PCR]
# For demonstration, we won't use expiry_df here. We'll rely on file-naming for expiries.
# But if you do have an expiry_df, you can cross-check or store statuses, etc.
# ---------------------------------------------------
# B) Gather OptionChain files for multiple expiries
# ---------------------------------------------------
pattern = "data/MAIN_NIFTY50_OPTIONS_*_OptionChain.pkl"
files = glob.glob(pattern)
logger.info(f"Found {len(files)} option chain files: {files}")
# We want to parse the date from each filename: "YYYY_MM_DD"
# We assume it ends with "..._YYYY_MM_DD_OptionChain.pkl"
date_pattern = re.compile(r"NIFTY50_(\d{4})_(\d{2})_(\d{2})_OptionChain\.pkl$")
# We'll collect merged data for all expiries
# combined_pcr_list, combined_close_list = big arrays to store data for all
# but we don't know total length up front, so we store partial results in lists, then np.concatenate
pcr_segments = []
close_segments = []
expiry_offsets = []
expiry_labels = [] # track the string date for reporting
current_start_idx = 0 # cumulative offset
# Process each file
for file_path in sorted(files):
match = date_pattern.search(file_path)
if not match:
logger.warning(f"Skipping file (date parse failed): {file_path}")
continue
yyyy, mm, dd = match.groups()
expiry_str = f"{yyyy}-{mm}-{dd}" # e.g. "2021-01-07"
logger.info(f"Processing {file_path}, parsed expiry date: {expiry_str}")
# 1) Load option data
options_df = pd.read_pickle(file_path)
# 2) Convert the expiry date to "DD-MM-YYYY" if your data uses that format
dt = datetime.strptime(expiry_str, "%Y-%m-%d")
expiry_ddmmyyyy = dt.strftime("%d-%m-%Y")
# 3) Filter DataFrame for just that expiry
df_exp = options_df[options_df['ExpiryDate'] == expiry_ddmmyyyy].copy()
df_exp.sort_values(['Date','Time'], inplace=True)
if df_exp.empty:
logger.warning(f"No matching rows for expiry {expiry_ddmmyyyy}, skipping.")
continue
# 4) Merge with PCR on [Date, Time]
df_merged = pd.merge(df_exp, pcr_df, on=['Date','Time'], how='left')
df_merged.sort_values(['Date','Time'], inplace=True)
df_merged.dropna(subset=['PCR'], inplace=True)
if df_merged.empty:
logger.warning(f"After merging with PCR, no data remains for {expiry_str}, skipping.")
continue
pcr_vals = df_merged['PCR'].values.astype(np.float32)
close_vals = df_merged['Close'].values.astype(np.float32)
segment_len = len(pcr_vals)
# 5) Store segment arrays
pcr_segments.append(pcr_vals)
close_segments.append(close_vals)
# Build offset info
expiry_offsets.append((current_start_idx, segment_len))
expiry_labels.append(expiry_str)
current_start_idx += segment_len
# If no valid data found at all, bail out
num_expiries = len(expiry_offsets)
if num_expiries == 0:
logger.info("No valid expiry data found. Exiting.")
return
# ---------------------------------------------------
# C) Combine all data into big arrays
# ---------------------------------------------------
logger.info("Combining data for all expiries into single arrays...")
combined_pcr = np.concatenate(pcr_segments) # shape (sum_of_lengths,)
combined_close = np.concatenate(close_segments) # same shape
offsets_np = np.array(expiry_offsets, dtype=np.int32) # shape (num_expiries, 2)
# ---------------------------------------------------
# D) Copy to GPU
# ---------------------------------------------------
logger.info(f"Total bars across all {num_expiries} expiries: {combined_pcr.shape[0]}")
pcr_gpu = cuda.to_device(combined_pcr)
close_gpu = cuda.to_device(combined_close)
offsets_gpu = cuda.to_device(offsets_np)
# We'll have 1 thread per expiry => result array of length num_expiries
result_gpu = cuda.device_array(num_expiries, dtype=np.float32)
# ---------------------------------------------------
# E) Launch the kernel: one thread per expiry
# ---------------------------------------------------
threads_per_block = 128
blocks = (num_expiries + threads_per_block - 1) // threads_per_block
logger.info(f"Launching kernel with {blocks} blocks, {threads_per_block} threads/block, for {num_expiries} expiries.")
start_time = time.time()
backtest_multiple_expiries_kernel[blocks, threads_per_block](
pcr_gpu, close_gpu, offsets_gpu,
BULLISH_ENTRY_PCR, HEAVILY_BULLISH_PCR,
BEARISH_ENTRY_PCR, HEAVILY_BEARISH_PCR,
result_gpu
)
cuda.synchronize()
end_time = time.time()
time_unit = (end_time - start_time) * 1e9
# ---------------------------------------------------
# F) Retrieve results
# ---------------------------------------------------
result_cpu = result_gpu.copy_to_host() # shape (num_expiries,)
# ---------------------------------------------------
# G) Reporting
# ---------------------------------------------------
logger.info("Backtest results for each expiry:")
for i, exp_date in enumerate(expiry_labels):
pnl = result_cpu[i]
logger.info(f" {exp_date} => PnL: {pnl:.2f}")
logger.info(f"Time taken by kernal: {time_unit:.2f}ns")
if __name__ == "__main__":
main()Editor is loading...
Leave a Comment