Untitled
unknown
plain_text
10 months ago
10 kB
6
Indexable
import pandas as pd import numpy as np from sklearn.ensemble import RandomForestClassifier from sklearn.model_selection import train_test_split, GridSearchCV from sklearn.metrics import accuracy_score, classification_report, confusion_matrix from sklearn.preprocessing import StandardScaler from sklearn.feature_selection import SelectKBest, f_classif import matplotlib.pyplot as plt import seaborn as sns # 定义技术指标的函数 def moving_average(values, window): return values.rolling(window=window).mean() def exponential_moving_average(values, span): return values.ewm(span=span, adjust=False).mean() def relative_strength_index(values, n=14): delta = values.diff() gain = (delta.where(delta > 0, 0)).rolling(window=n).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=n).mean() rs = gain / loss return 100 - (100 / (1 + rs)) def macd(values, slow=26, fast=12, signal=9): exp1 = values.ewm(span=fast, adjust=False).mean() exp2 = values.ewm(span=slow, adjust=False).mean() macd_line = exp1 - exp2 signal_line = macd_line.ewm(span=signal, adjust=False).mean() hist = macd_line - signal_line return macd_line, signal_line, hist def bollinger_bands(values, window=20, no_of_std=2): mean = values.rolling(window=window).mean() std = values.rolling(window=window).std() upper_band = mean + (std * no_of_std) lower_band = mean - (std * no_of_std) return upper_band, mean, lower_band def stochastic_oscillator(values, n=14): low = values.rolling(window=n).min() high = values.rolling(window=n).max() return 100 * (values - low) / (high - low) def average_directional_index(data, n=14): delta_high = data['高'].diff() delta_low = data['低'].diff() plus_dm = pd.Series(np.where((delta_high > delta_low) & (delta_high > 0), delta_high, 0)) minus_dm = pd.Series(np.where((delta_low > delta_high) & (delta_low > 0), delta_low, 0)) tr = pd.Series(np.maximum((data['高'] - data['低']), np.maximum(abs(data['高'] - data['收市'].shift()), abs(data['低'] - data['收市'].shift())))) atr = tr.rolling(window=n).mean() plus_di = 100 * (plus_dm.rolling(window=n).mean() / atr) minus_di = 100 * (minus_dm.rolling(window=n).mean() / atr) dx = 100 * abs((plus_di - minus_di) / (plus_di + minus_di)) adx = dx.rolling(window=n).mean() return adx def prepare_data(data): data['MA20'] = moving_average(data['收市'], 20) data['EMA50'] = exponential_moving_average(data['收市'], 50) data['RSI'] = relative_strength_index(data['收市']) data['MACD'], data['MACD_signal'], data['MACD_hist'] = macd(data['收市']) data['upper_band'], data['middle_band'], data['lower_band'] = bollinger_bands(data['收市']) data['Stochastic'] = stochastic_oscillator(data['收市']) data['ADX'] = average_directional_index(data) data['Return'] = data['收市'].pct_change() data['Volatility'] = data['Return'].rolling(window=20).std() return data.dropna() def create_labels(data, future_days=5, threshold=0.01): data['Future Price'] = data['收市'].shift(-future_days) data['Label'] = 0 data.loc[data['Future Price'] > data['收市'] * (1 + threshold), 'Label'] = 1 data.loc[data['Future Price'] < data['收市'] * (1 - threshold), 'Label'] = -1 return data.dropna() def train_model(X_train, y_train): param_grid = { 'n_estimators': [100, 200], 'max_depth': [None, 10, 20], 'min_samples_split': [2, 5, 10] } grid_search = GridSearchCV(RandomForestClassifier(random_state=42), param_grid, cv=3, n_jobs=-1, verbose=2) grid_search.fit(X_train, y_train) model = grid_search.best_estimator_ return model # 转换带单位的成交量为数值 def convert_volume(volume_str): if 'K' in volume_str: return float(volume_str.replace('K', '').replace(',', '')) * 1e3 elif 'M' in volume_str: return float(volume_str.replace('M', '').replace(',', '')) * 1e6 elif 'B' in volume_str: return float(volume_str.replace('B', '').replace(',', '')) * 1e9 else: return float(volume_str.replace(',', '')) # 加载2020-2023年的数据 file_paths = ['/Users/eric/Desktop/data/乙太坊歷史數據 2019-2023-5.csv', '/Users/eric/Desktop/data/比特幣歷史數據 2019-2023-5.csv', '/Users/eric/Desktop/data/Solana歷史數據 2019-2023-5.csv', '/Users/eric/Desktop/data/狗狗幣歷史數據 2019-2023-5.csv'] data_list = [] for file_path in file_paths: data = pd.read_csv(file_path) # 确保数据格式正确 data['收市'] = data['收市'].astype(str).str.replace(',', '').astype(float) data['開市'] = data['開市'].astype(str).str.replace(',', '').astype(float) data['高'] = data['高'].astype(str).str.replace(',', '').astype(float) data['低'] = data['低'].astype(str).str.replace(',', '').astype(float) data['成交量'] = data['成交量'].astype(str).apply(convert_volume) # 清理和转换'升跌(%)'列 data['升跌(%)'] = data['升跌(%)'].astype(str).str.replace('%', '').str.replace(',', '') data['升跌(%)'] = pd.to_numeric(data['升跌(%)'], errors='coerce') # 删除清理过程中可能引入的任何NaN值 data.dropna(inplace=True) # 将数据从旧到新重新排序 data = data.iloc[::-1].reset_index(drop=True) # 准备和标记数据 data = prepare_data(data) data = create_labels(data) data_list.append(data) eth_data_train = pd.concat(data_list, ignore_index=True) # 分割数据为训练集和验证集 feature_columns = ['MA20', 'EMA50', 'RSI', 'MACD', 'MACD_signal', 'MACD_hist', 'upper_band', 'middle_band', 'lower_band', 'Stochastic', 'ADX', 'Return', 'Volatility'] X_train = eth_data_train[feature_columns] y_train = eth_data_train['Label'] selector = SelectKBest(f_classif, k=10) X_train_new = selector.fit_transform(X_train, y_train) scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train_new) # 训练模型 eth_model = train_model(X_train_scaled, y_train) # 加载2024年的数据 test_file_paths = ['/Users/eric/Desktop/data/乙太坊歷史數據 2023-6-2024.csv'] test_data_list = [] for test_file_path in test_file_paths: test_data = pd.read_csv(test_file_path) # 确保数据格式正确 test_data['收市'] = test_data['收市'].astype(str).str.replace(',', '').astype(float) test_data['開市'] = test_data['開市'].astype(str).str.replace(',', '').astype(float) test_data['高'] = test_data['高'].astype(str).str.replace(',', '').astype(float) test_data['低'] = test_data['低'].astype(str).str.replace(',', '').astype(float) test_data['成交量'] = test_data['成交量'].astype(str).apply(convert_volume) # 清理和转换'升跌(%)'列 test_data['升跌(%)'] = test_data['升跌(%)'].astype(str).str.replace('%', '').str.replace(',', '') test_data['升跌(%)'] = pd.to_numeric(test_data['升跌(%)'], errors='coerce') # 删除清理过程中可能引入的任何NaN值 test_data.dropna(inplace=True) # 将数据从旧到新重新排序 test_data = test_data.iloc[::-1].reset_index(drop=True) # 准备和标记数据 test_data = prepare_data(test_data) test_data = create_labels(test_data) test_data_list.append(test_data) # 合并所有测试数据 eth_data_test = pd.concat(test_data_list, ignore_index=True) # 使用训练好的模型进行预测 X_test = eth_data_test[feature_columns] # 特征选择和标准化 X_test_new = selector.transform(X_test) X_test_scaled = scaler.transform(X_test_new) # 模拟交易函数 def simulate_trading(data, model, X_scaled, initial_cash=100): data = data.iloc[-len(X_scaled):] data['Predicted'] = model.predict(X_scaled) cash = initial_cash holding = 0 # 持有的以太坊数量 short_position = 0 # 做空头寸数量 trade_log = [] for index, row in data.iterrows(): if row['Predicted'] == 1 and cash > 0: # 预测价格会上涨,买入 holding = cash / row['收市'] trade_log.append(f"买入 {holding:.4f} 以太坊 @ {row['收市']} on {row['日期']}") elif row['Predicted'] == -1 and holding > 0: # 预测价格会下跌,卖出 cash = holding * row['收市'] trade_log.append(f"卖出 {holding:.4f} 以太坊 @ {row['收市']} on {row['日期']},交易金额: ${cash:.2f}") holding = 0 elif row['Predicted'] == -1 and cash > 0: # 预测价格会下跌,做空 short_position = row['收市'] trade_log.append(f"做空 {short_position:.4f} 以太坊 @ {row['收市']} on {row['日期']}") elif row['Predicted'] == 1 and short_position > 0: # 预测价格会上涨,平仓 cash = ( short_position - row['收市'] ) *cash / row['收市'] + cash trade_log.append(f"平仓 {short_position:.4f} 以太坊 @ {row['收市']} on {row['日期']},交易金额: ${cash:.2f}") short_position = 0 # 计算最终的资产值 if holding > 0: final_value = holding * data.iloc[-1]['收市'] trade_log.append(f"持有 {holding:.4f} 以太坊到最后 @ {data.iloc[-1]['收市']},总价值: ${final_value:.2f}") elif short_position > 0: final_value = short_position * data.iloc[-1]['收市'] trade_log.append(f"持有做空头寸 {short_position:.4f} 以太坊到最后 @ {data.iloc[-1]['收市']},总价值: ${final_value:.2f}") else: final_value = cash trade_log.append(f"最终现金: ${final_value:.2f}") return final_value, trade_log # 运行模拟交易 final_cash, trade_log = simulate_trading(eth_data_test, eth_model, X_test_scaled) print(f"Initial cash: $100") print(f"Final cash after trading: ${final_cash:.2f}") # 打印交易日志 for log in trade_log: print(log) # 计算混淆矩阵 y_test = eth_data_test['Label'] y_pred = eth_model.predict(X_test_scaled) cm = confusion_matrix(y_test, y_pred) # 绘制混淆矩阵 plt.figure(figsize=(10, 7)) sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Down', 'Up'], yticklabels=['Down', 'Up']) plt.title('Ethereum Confusion Matrix') plt.xlabel('Predicted') plt.ylabel('Actual') plt.show() # 打印模型评估指标 print(f"Accuracy: {accuracy_score(y_test, y_pred):.2f}") print("Classification Report:") print(classification_report(y_test, y_pred, target_names=['Down', 'Up']))
Editor is loading...
Leave a Comment