fix

mail@pastecode.io avatar
unknown
plain_text
a month ago
11 kB
3
Indexable
Never
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectKBest, f_classif
import matplotlib.pyplot as plt
import seaborn as sns

# 定义技术指标的函数
def moving_average(values, window):
    return values.rolling(window=window).mean()

def exponential_moving_average(values, span):
    return values.ewm(span=span, adjust=False).mean()

def relative_strength_index(values, n=14):
    delta = values.diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=n).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=n).mean()
    rs = gain / loss
    return 100 - (100 / (1 + rs))

def macd(values, slow=26, fast=12, signal=9):
    exp1 = values.ewm(span=fast, adjust=False).mean()
    exp2 = values.ewm(span=slow, adjust=False).mean()
    macd_line = exp1 - exp2
    signal_line = macd_line.ewm(span=signal, adjust=False).mean()
    hist = macd_line - signal_line
    return macd_line, signal_line, hist

def bollinger_bands(values, window=20, no_of_std=2):
    mean = values.rolling(window=window).mean()
    std = values.rolling(window=window).std()
    upper_band = mean + (std * no_of_std)
    lower_band = mean - (std * no_of_std)
    return upper_band, mean, lower_band

def stochastic_oscillator(values, n=14):
    low = values.rolling(window=n).min()
    high = values.rolling(window=n).max()
    return 100 * (values - low) / (high - low)

def average_directional_index(data, n=14):
    delta_high = data['高'].diff()
    delta_low = data['低'].diff()
    plus_dm = pd.Series(np.where((delta_high > delta_low) & (delta_high > 0), delta_high, 0))
    minus_dm = pd.Series(np.where((delta_low > delta_high) & (delta_low > 0), delta_low, 0))
    tr = pd.Series(np.maximum((data['高'] - data['低']), np.maximum(abs(data['高'] - data['收市'].shift()), abs(data['低'] - data['收市'].shift()))))
    atr = tr.rolling(window=n).mean()
    plus_di = 100 * (plus_dm.rolling(window=n).mean() / atr)
    minus_di = 100 * (minus_dm.rolling(window=n).mean() / atr)
    dx = 100 * abs((plus_di - minus_di) / (plus_di + minus_di))
    adx = dx.rolling(window=n).mean()
    return adx

def prepare_data(data):
    data['MA20'] = moving_average(data['收市'], 20)
    data['EMA50'] = exponential_moving_average(data['收市'], 50)
    data['RSI'] = relative_strength_index(data['收市'])
    data['MACD'], data['MACD_signal'], data['MACD_hist'] = macd(data['收市'])
    data['upper_band'], data['middle_band'], data['lower_band'] = bollinger_bands(data['收市'])
    data['Stochastic'] = stochastic_oscillator(data['收市'])
    data['ADX'] = average_directional_index(data)
    data['Return'] = data['收市'].pct_change()
    data['Volatility'] = data['Return'].rolling(window=20).std()
    return data.dropna()

def create_labels(data, future_days=5, threshold=0.01):
    data['Future Price'] = data['收市'].shift(-future_days)
    data['Label'] = 0
    data.loc[data['Future Price'] > data['收市'] * (1 + threshold), 'Label'] = 1
    data.loc[data['Future Price'] < data['收市'] * (1 - threshold), 'Label'] = -1
    return data.dropna()

def train_model(X_train, y_train):
    param_grid = {
        'n_estimators': [100, 200],
        'max_depth': [None, 10, 20],
        'min_samples_split': [2, 5, 10]
    }

    grid_search = GridSearchCV(RandomForestClassifier(random_state=42), param_grid, cv=3, n_jobs=-1, verbose=2)
    grid_search.fit(X_train, y_train)
    model = grid_search.best_estimator_

    return model

# 转换带单位的成交量为数值
def convert_volume(volume_str):
    if 'K' in volume_str:
        return float(volume_str.replace('K', '').replace(',', '')) * 1e3
    elif 'M' in volume_str:
        return float(volume_str.replace('M', '').replace(',', '')) * 1e6
    elif 'B' in volume_str:
        return float(volume_str.replace('B', '').replace(',', '')) * 1e9
    else:
        return float(volume_str.replace(',', ''))

# 加载2020-2023年的数据
file_paths = ['/Users/eric/Desktop/data/乙太坊歷史數據 2019-2023-5.csv', '/Users/eric/Desktop/data/比特幣歷史數據 2019-2023-5.csv', '/Users/eric/Desktop/data/Solana歷史數據 2019-2023-5.csv', '/Users/eric/Desktop/data/狗狗幣歷史數據 2019-2023-5.csv']
data_list = []

for file_path in file_paths:
    data = pd.read_csv(file_path)

    # 确保数据格式正确
    data['收市'] = data['收市'].astype(str).str.replace(',', '').astype(float)
    data['開市'] = data['開市'].astype(str).str.replace(',', '').astype(float)
    data['高'] = data['高'].astype(str).str.replace(',', '').astype(float)
    data['低'] = data['低'].astype(str).str.replace(',', '').astype(float)
    data['成交量'] = data['成交量'].astype(str).apply(convert_volume)

    # 清理和转换'升跌(%)'列
    data['升跌(%)'] = data['升跌(%)'].astype(str).str.replace('%', '').str.replace(',', '')
    data['升跌(%)'] = pd.to_numeric(data['升跌(%)'], errors='coerce')

    # 删除清理过程中可能引入的任何NaN值
    data.dropna(inplace=True)

    # 将数据从旧到新重新排序
    data = data.iloc[::-1].reset_index(drop=True)

    # 准备和标记数据
    data = prepare_data(data)
    data = create_labels(data)
    data_list.append(data)

eth_data_train = pd.concat(data_list, ignore_index=True)

# 分割数据为训练集和验证集
feature_columns = ['MA20', 'EMA50', 'RSI', 'MACD', 'MACD_signal', 'MACD_hist', 'upper_band', 'middle_band', 'lower_band', 'Stochastic', 'ADX', 'Return', 'Volatility']
X_train = eth_data_train[feature_columns]
y_train = eth_data_train['Label']

selector = SelectKBest(f_classif, k=10)
X_train_new = selector.fit_transform(X_train, y_train)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train_new)

# 训练模型
eth_model = train_model(X_train_scaled, y_train)

# 加载2024年的数据
test_file_paths = ['/Users/eric/Desktop/data/乙太坊歷史數據 2023-6-2024.csv']
test_data_list = []

for test_file_path in test_file_paths:
    test_data = pd.read_csv(test_file_path)

    # 确保数据格式正确
    test_data['收市'] = test_data['收市'].astype(str).str.replace(',', '').astype(float)
    test_data['開市'] = test_data['開市'].astype(str).str.replace(',', '').astype(float)
    test_data['高'] = test_data['高'].astype(str).str.replace(',', '').astype(float)
    test_data['低'] = test_data['低'].astype(str).str.replace(',', '').astype(float)
    test_data['成交量'] = test_data['成交量'].astype(str).apply(convert_volume)

    # 清理和转换'升跌(%)'列
    test_data['升跌(%)'] = test_data['升跌(%)'].astype(str).str.replace('%', '').str.replace(',', '')
    test_data['升跌(%)'] = pd.to_numeric(test_data['升跌(%)'], errors='coerce')
    # 删除清理过程中可能引入的任何NaN值
    test_data.dropna(inplace=True)

    # 将数据从旧到新重新排序
    test_data = test_data.iloc[::-1].reset_index(drop=True)

    # 准备和标记数据
    test_data = prepare_data(test_data)
    test_data = create_labels(test_data)
    test_data_list.append(test_data)

# 合并所有测试数据
eth_data_test = pd.concat(test_data_list, ignore_index=True)

# 使用训练好的模型进行预测
X_test = eth_data_test[feature_columns]

# 特征选择和标准化
X_test_new = selector.transform(X_test)
X_test_scaled = scaler.transform(X_test_new)

# 模拟交易函数
def simulate_trading(data, model, X_scaled, initial_cash=100):
    data = data.iloc[-len(X_scaled):]
    data['Predicted'] = model.predict(X_scaled)
    
    cash = initial_cash
    holding = 0  # 持有的以太坊数量
    short_position = 0  # 做空头寸数量
    trade_log = []
    short_position_prize = 0
    short_position_cash = 0

    for index, row in data.iterrows():
        if row['Predicted'] == 1 and cash > 0:  # 预测价格会上涨,买入
            holding = cash / row['收市']
            trade_log.append(f"买入 {holding:.4f} 以太坊 @ {row['收市']} on {row['日期']}")
            cash = 0
        elif row['Predicted'] == -1 and holding > 0:  # 预测价格会下跌,卖出
            cash = holding * row['收市']
            trade_log.append(f"卖出 {holding:.4f} 以太坊 @ {row['收市']} on {row['日期']},交易金额: ${cash:.2f}")
            holding = 0
        elif row['Predicted'] == -1 and cash > 0:  # 预测价格会下跌,做空
            short_position = cash / row['收市']
            short_position_prize = row['收市']
            short_position_cash = cash
            trade_log.append(f"做空 {short_position:.4f} 以太坊 @ {row['收市']} on {row['日期']}")
            cash = 0
        elif row['Predicted'] == 1 and short_position > 0:  # 预测价格会上涨,平仓
            cash = (short_position_prize - row['收市']) * short_position / row['收市'] + short_posistion
            trade_log.append(f"平仓 {short_position:.4f} 以太坊 @ {row['收市']} on {row['日期']},交易金额: ${cash:.2f}")
            short_position = 0
    
    # 计算最终的资产值
    if holding > 0:
        final_value = holding * data.iloc[-1]['收市']
        trade_log.append(f"持有 {holding:.4f} 以太坊到最后 @ {data.iloc[-1]['收市']},总价值: ${final_value:.2f}")
    elif short_position > 0:
        final_value = short_position * data.iloc[-1]['收市']
        trade_log.append(f"持有做空头寸 {short_position:.4f} 以太坊到最后 @ {data.iloc[-1]['收市']},总价值: ${final_value:.2f}")
    else:
        final_value = cash
        trade_log.append(f"最终现金: ${final_value:.2f}")
    
    return final_value, trade_log

# 运行模拟交易
final_cash, trade_log = simulate_trading(eth_data_test, eth_model, X_test_scaled)
print(f"Initial cash: $100")
print(f"Final cash after trading: ${final_cash:.2f}")

# 打印交易日志
for log in trade_log:
    print(log)

# 计算混淆矩阵
y_test = eth_data_test['Label']
y_pred = eth_model.predict(X_test_scaled)
cm = confusion_matrix(y_test, y_pred)

# 绘制混淆矩阵
plt.figure(figsize=(10, 7))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Down', 'Up'], yticklabels=['Down', 'Up'])
plt.title('Ethereum Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.show()

# 打印模型评估指标
print(f"Accuracy: {accuracy_score(y_test, y_pred):.2f}")
print("Classification Report:")
print(classification_report(y_test, y_pred, target_names=['Down', 'Up']))

   
Leave a Comment