import argparse
import json
import math
import os
from typing import List, Optional, Tuple
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import joblib
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DEFAULT_WINDOW = 360
DEFAULT_HORIZON = 120
DEFAULT_SCALE_Y = 10.0
BATCH_SIZE = 256
EPS = 1e-8
_signal_cached = {
"model": None,
"scaler": None,
"y_mean": 0.0,
"y_std": 1.0,
"scale_y": DEFAULT_SCALE_Y,
"window": DEFAULT_WINDOW,
"horizon": DEFAULT_HORIZON,
}
def load_artifacts_for_signal(model_path: str,
scaler_path: str,
label_stats_path: str,
input_size_hint: int = None,
hidden: int = 256,
layers: int = 2,
bidirectional: bool = True,
pooling: str = "attn"):
scaler = joblib.load(scaler_path)
in_features = getattr(scaler, "n_features_in_", None)
if in_features is None and hasattr(scaler, "mean_"):
in_features = int(getattr(scaler, "mean_").shape[0])
if input_size_hint is not None:
in_features = int(input_size_hint)
if not in_features:
raise ValueError("无法推断输入特征数,请传 input_size_hint")
with open(label_stats_path, "r") as f:
stats = json.load(f)
y_mean = float(stats.get("y_mean", 0.0))
y_std = float(stats.get("y_std", 1.0))
scale_y = float(stats.get("scale", DEFAULT_SCALE_Y))
# 构建模型(按训练时结构)
model = GRURegressor(
input_size=in_features,
hidden_size=hidden,
num_layers=layers,
bidirectional=bidirectional,
dropout=0.2,
pooling=pooling,
).to(DEVICE)
# 兼容纯 state_dict
state = torch.load(model_path, map_location=DEVICE, weights_only=True)
if isinstance(state, dict) and "state_dict" in state:
state = state["state_dict"]
try:
model.load_state_dict(state, strict=True)
except Exception:
# 若 input_size 不一致,可先用 strict=False 跑通;首次 get_generated_signal 会再次校验
model.load_state_dict(state, strict=False)
model.eval()
_signal_cached.update(dict(
model=model,
scaler=scaler,
y_mean=y_mean,
y_std=y_std,
scale_y=scale_y,
))
def get_generated_signal(df_recent):
assert _signal_cached["model"] is not None, "请先调用 load_artifacts_for_signal() 加载模型等文件"
model = _signal_cached["model"]
scaler = _signal_cached["scaler"]
y_mean = _signal_cached["y_mean"]
y_std = _signal_cached["y_std"]
scale_y = _signal_cached["scale_y"]
window = _signal_cached.get("window", DEFAULT_WINDOW)
horizon = _signal_cached.get("horizon", DEFAULT_HORIZON)
# 使用你已有的预处理:构造特征并标准化
feats = _prepare_for_scaler(df_recent.copy()) # -> np.float32 of shape [N, F]
feats = scaler.transform(feats).astype(np.float32)
if feats.shape[0] < window:
# 数据不足,不给信号
return 0.0
# 取最后一个滑窗
X = feats[-window:] # [T, F]
X = np.expand_dims(X, axis=0) # [1, T, F]
X_t = torch.from_numpy(X).to(DEVICE)
with torch.no_grad():
yhat_std = model(X_t).detach().cpu().numpy().ravel()[0]
yhat_raw = yhat_std * y_std + y_mean
sum_log_return = (yhat_raw / float(scale_y)) * math.sqrt(float(horizon))
expected_return = np.expm1(sum_log_return)
return float(expected_return)
# -----------------------------
# Model (same as training/inference)
# -----------------------------
class AttnPool(nn.Module):
def __init__(self, dim):
super().__init__()
self.w = nn.Linear(dim, 1, bias=False)
def forward(self, h):
a = self.w(h).squeeze(-1)
a = torch.softmax(a, dim=1)
return (h * a.unsqueeze(-1)).sum(dim=1)
class GRURegressor(nn.Module):
def __init__(self, input_size: int, hidden_size: int = 128, num_layers: int = 1,
bidirectional: bool = False, dropout: float = 0.0, pooling: str = "attn"):
super().__init__()
self.bidirectional = bidirectional
self.num_directions = 2 if bidirectional else 1
self.hidden_size = hidden_size
self.pooling = pooling
self.gru = nn.GRU(
input_size=input_size, hidden_size=hidden_size, num_layers=num_layers,
batch_first=True, bidirectional=bidirectional,
dropout=dropout if num_layers > 1 else 0.0,
)
self.out_drop = nn.Dropout(p=0.2)
dim = hidden_size * self.num_directions
self.reg_head = nn.Linear(dim, 1)
if self.pooling == "attn":
self.attn_pool = AttnPool(dim)
def forward(self, x):
out, _ = self.gru(x)
if self.pooling == "mean":
feat = out.mean(dim=1)
elif self.pooling == "attn":
feat = self.attn_pool(out)
else:
feat = out[:, -1, :]
feat = self.out_drop(feat)
y_reg = self.reg_head(feat).squeeze(-1)
return y_reg
def _read_raw(csv_path: str) -> pd.DataFrame:
if not os.path.exists(csv_path):
raise FileNotFoundError(f"CSV not found: {csv_path}")
return pd.read_csv(csv_path)
def _extract_time_col(df: pd.DataFrame) -> Optional[pd.Series]:
time_col = None
if "close_time" in df.columns:
time_col = df["close_time"]
elif "open_time" in df.columns:
time_col = df["open_time"]
if time_col is None:
return None
s = time_col.copy()
try:
if pd.api.types.is_numeric_dtype(s) and s.max() > 1e12:
s = pd.to_datetime(s, unit="ms", utc=True).dt.tz_convert("UTC")
else:
s = pd.to_datetime(s, utc=True, errors="coerce")
if s.isna().all() and pd.api.types.is_numeric_dtype(time_col):
s = pd.to_datetime(time_col, unit="s", utc=True)
except Exception:
return None
return s.dt.strftime("%Y-%m-%dT%H:%M:%SZ")
def _prepare_for_scaler(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
for c in ["open_time", "close_time", "ignore"]:
if c in df.columns:
df = df.drop(columns=[c])
for c in df.columns:
df[c] = pd.to_numeric(df[c], errors="coerce")
if "close" in df.columns:
close = df["close"].astype(float).values
else:
close = df.iloc[:, 0].astype(float).values
df["close"] = close
lr = np.zeros_like(close, dtype=np.float64)
lr[1:] = np.log((close[1:] + EPS) / (close[:-1] + EPS))
df["log_return"] = lr
ema20 = df["close"].ewm(span=20, adjust=False, min_periods=5).mean()
ema60 = df["close"].ewm(span=60, adjust=False, min_periods=10).mean()
df["ema_diff"] = (ema20 - ema60) / (ema60.abs() + EPS)
vol_roll = df["log_return"].rolling(60, min_periods=10).std()
df["vol_roll"] = vol_roll
df = df.replace([np.inf, -np.inf], np.nan).dropna().reset_index(drop=True)
keep_cols = ["vol_roll", "ema_diff", "volume", "open", "low", "high"]
for c in keep_cols:
if c not in df.columns:
raise KeyError(f"缺少必须特征列: {c}")
df = df[keep_cols].values.astype(np.float32)
return df
def _windows_view(data: np.ndarray, window: int) -> np.ndarray:
N = data.shape[0] - window + 1
xs: List[np.ndarray] = []
for i in range(N):
xs.append(data[i:i+window])
return np.stack(xs, axis=0)
def pred_to_metrics(yhat_std: np.ndarray, y_mean: float, y_std: float, scale_y: float) -> np.ndarray:
yhat_raw = yhat_std * y_std + y_mean
sum_log_return = (yhat_raw / float(scale_y)) * math.sqrt(float(DEFAULT_HORIZON))
expected_return = np.expm1(sum_log_return)
return expected_return
def main():
ap = argparse.ArgumentParser(description="Generate per-bar signals CSV for backtesting.")
ap.add_argument("--csv", type=str, required=True, help="Input klines CSV.")
ap.add_argument("--model", type=str, default="return/checkpoints/best_icp.pt", help="Trained model state_dict path.")
ap.add_argument("--scaler", type=str, default="return/configs/scaler.joblib", help="StandardScaler path.")
ap.add_argument("--label-stats", type=str, default="return/configs/label_stats.json", help="Label stats JSON path.")
ap.add_argument("--hidden", type=int, default=256)
ap.add_argument("--layers", type=int, default=2)
ap.add_argument("--out", type=str, default="signals.csv", help="Output CSV path.")
args = ap.parse_args()
# Load artifacts
if not os.path.exists(args.scaler):
raise FileNotFoundError(f"Scaler not found: {args.scaler}")
scaler = joblib.load(args.scaler)
if not os.path.exists(args.label_stats):
raise FileNotFoundError(f"Label stats not found: {args.label_stats}")
with open(args.label_stats, "r") as f:
label_stats = json.load(f)
y_mean = float(label_stats.get("y_mean", 0.0))
y_std = float(label_stats.get("y_std", 1.0))
scale_y = float(label_stats.get("scale", DEFAULT_SCALE_Y))
# Load data
raw = _read_raw(args.csv)
time_series = _extract_time_col(raw) # may be None
df = _prepare_for_scaler(raw)
data = df.astype(np.float32)
data = scaler.transform(data).astype(np.float32)
# windows
T = DEFAULT_WINDOW
N_total = data.shape[0]
if N_total < T:
raise ValueError(f"Not enough rows: need at least {T}, got {N_total}")
# We'll predict for window ending at each bar t in [T-1 .. N_total-1]
# And sub-sample by step for speed.
idx_ends = np.arange(T-1, N_total, 1, dtype=np.int64) # bar index of window end
# Build batched tensor for inference
results = []
# Build model
input_size = data.shape[1]
model = GRURegressor(
input_size=input_size,
hidden_size=args.hidden,
num_layers=args.layers,
bidirectional=True,
dropout=0.2,
pooling="attn",
).to(DEVICE)
# Safe weights-only load (PyTorch >= 2.5; retains compatibility with state_dict checkpoints)
state = torch.load(args.model, map_location=DEVICE, weights_only=True)
model.load_state_dict(state, strict=True)
model.eval()
batch = BATCH_SIZE
with torch.no_grad():
# Build windows on the fly to avoid huge memory on very long series
for start in range(0, len(idx_ends), batch):
ends_batch = idx_ends[start:start+batch]
X = np.stack([data[e-T+1:e+1] for e in ends_batch], axis=0) # [B, T, F]
X_t = torch.from_numpy(X).to(DEVICE)
yhat_std = model(X_t).detach().cpu().numpy().astype(np.float64) # [B]
expected_return = pred_to_metrics(
yhat_std, y_mean, y_std, DEFAULT_SCALE_Y
)
# Collect rows
for j, e in enumerate(ends_batch):
idx = int(e)
ts = "" # default empty
if time_series is not None and idx < len(time_series):
ts = str(time_series.iloc[idx])
results.append({
"index": idx,
"time": ts,
"expected_return": float(expected_return[j]),
})
# Save to CSV
out_dir = os.path.dirname(args.out)
if out_dir:
os.makedirs(out_dir, exist_ok=True)
pd.DataFrame(results).to_csv(args.out, index=False, encoding="utf-8")
print(f"Wrote {len(results)} rows to {args.out}")
if __name__ == "__main__":
main()
虚拟货币价格预测
回测结果3-6月表现不错,拉长到一年结果就完蛋,但是grok评价是核弹级别的结果直接可以在金融市场抢钱
AI不会说的或者不懂金融的不知道的因为几个月区间货币价格涨幅不大导致误差很小模型一直在收敛以为学到东西了其实什么也没学到
总结:我支持金融市场完全随机假说!