使用虚拟货币数据进行深度学习

import argparse
import json
import math
import os
from typing import List, Optional, Tuple

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import joblib

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

DEFAULT_WINDOW = 360
DEFAULT_HORIZON = 120
DEFAULT_SCALE_Y = 10.0

BATCH_SIZE = 256

EPS = 1e-8

_signal_cached = {
    "model": None,
    "scaler": None,
    "y_mean": 0.0,
    "y_std": 1.0,
    "scale_y": DEFAULT_SCALE_Y,
    "window": DEFAULT_WINDOW,
    "horizon": DEFAULT_HORIZON,
}

def load_artifacts_for_signal(model_path: str,
                              scaler_path: str,
                              label_stats_path: str,
                              input_size_hint: int = None,
                              hidden: int = 256,
                              layers: int = 2,
                              bidirectional: bool = True,
                              pooling: str = "attn"):

    scaler = joblib.load(scaler_path)
    in_features = getattr(scaler, "n_features_in_", None)
    if in_features is None and hasattr(scaler, "mean_"):
        in_features = int(getattr(scaler, "mean_").shape[0])
    if input_size_hint is not None:
        in_features = int(input_size_hint)
    if not in_features:
        raise ValueError("无法推断输入特征数,请传 input_size_hint")
    with open(label_stats_path, "r") as f:
        stats = json.load(f)
    y_mean = float(stats.get("y_mean", 0.0))
    y_std  = float(stats.get("y_std", 1.0))
    scale_y = float(stats.get("scale", DEFAULT_SCALE_Y))

    # 构建模型(按训练时结构)
    model = GRURegressor(
        input_size=in_features,
        hidden_size=hidden,
        num_layers=layers,
        bidirectional=bidirectional,
        dropout=0.2,
        pooling=pooling,
    ).to(DEVICE)

    # 兼容纯 state_dict
    state = torch.load(model_path, map_location=DEVICE, weights_only=True)
    if isinstance(state, dict) and "state_dict" in state:
        state = state["state_dict"]
    try:
        model.load_state_dict(state, strict=True)
    except Exception:
        # 若 input_size 不一致,可先用 strict=False 跑通;首次 get_generated_signal 会再次校验
        model.load_state_dict(state, strict=False)
    model.eval()

    _signal_cached.update(dict(
        model=model,
        scaler=scaler,
        y_mean=y_mean,
        y_std=y_std,
        scale_y=scale_y,
    ))


def get_generated_signal(df_recent):

    assert _signal_cached["model"] is not None, "请先调用 load_artifacts_for_signal() 加载模型等文件"
    model      = _signal_cached["model"]
    scaler     = _signal_cached["scaler"]
    y_mean     = _signal_cached["y_mean"]
    y_std      = _signal_cached["y_std"]
    scale_y    = _signal_cached["scale_y"]
    window     = _signal_cached.get("window", DEFAULT_WINDOW)
    horizon    = _signal_cached.get("horizon", DEFAULT_HORIZON)

    # 使用你已有的预处理:构造特征并标准化
    feats = _prepare_for_scaler(df_recent.copy())  # -> np.float32 of shape [N, F]
    feats = scaler.transform(feats).astype(np.float32)
    if feats.shape[0] < window:
        # 数据不足,不给信号
        return 0.0

    # 取最后一个滑窗
    X = feats[-window:]                       # [T, F]
    X = np.expand_dims(X, axis=0)            # [1, T, F]
    X_t = torch.from_numpy(X).to(DEVICE)

    with torch.no_grad():
        yhat_std = model(X_t).detach().cpu().numpy().ravel()[0]
        yhat_raw = yhat_std * y_std + y_mean
        sum_log_return = (yhat_raw / float(scale_y)) * math.sqrt(float(horizon))
        expected_return = np.expm1(sum_log_return)

    return float(expected_return)

# -----------------------------
# Model (same as training/inference)
# -----------------------------
class AttnPool(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.w = nn.Linear(dim, 1, bias=False)
    def forward(self, h):
        a = self.w(h).squeeze(-1)
        a = torch.softmax(a, dim=1)
        return (h * a.unsqueeze(-1)).sum(dim=1)

class GRURegressor(nn.Module):
    def __init__(self, input_size: int, hidden_size: int = 128, num_layers: int = 1,
                 bidirectional: bool = False, dropout: float = 0.0, pooling: str = "attn"):
        super().__init__()
        self.bidirectional = bidirectional
        self.num_directions = 2 if bidirectional else 1
        self.hidden_size = hidden_size
        self.pooling = pooling

        self.gru = nn.GRU(
            input_size=input_size, hidden_size=hidden_size, num_layers=num_layers,
            batch_first=True, bidirectional=bidirectional,
            dropout=dropout if num_layers > 1 else 0.0,
        )
        self.out_drop = nn.Dropout(p=0.2)

        dim = hidden_size * self.num_directions
        self.reg_head = nn.Linear(dim, 1)

        if self.pooling == "attn":
            self.attn_pool = AttnPool(dim)

    def forward(self, x):
        out, _ = self.gru(x)
        if self.pooling == "mean":
            feat = out.mean(dim=1)
        elif self.pooling == "attn":
            feat = self.attn_pool(out)
        else:
            feat = out[:, -1, :]
        feat = self.out_drop(feat)
        y_reg = self.reg_head(feat).squeeze(-1)
        return y_reg

def _read_raw(csv_path: str) -> pd.DataFrame:
    if not os.path.exists(csv_path):
        raise FileNotFoundError(f"CSV not found: {csv_path}")
    return pd.read_csv(csv_path)


def _extract_time_col(df: pd.DataFrame) -> Optional[pd.Series]:
    time_col = None
    if "close_time" in df.columns:
        time_col = df["close_time"]
    elif "open_time" in df.columns:
        time_col = df["open_time"]

    if time_col is None:
        return None

    s = time_col.copy()
    try:
        if pd.api.types.is_numeric_dtype(s) and s.max() > 1e12:
            s = pd.to_datetime(s, unit="ms", utc=True).dt.tz_convert("UTC")
        else:
            s = pd.to_datetime(s, utc=True, errors="coerce")
            if s.isna().all() and pd.api.types.is_numeric_dtype(time_col):
                s = pd.to_datetime(time_col, unit="s", utc=True)
    except Exception:
        return None
    return s.dt.strftime("%Y-%m-%dT%H:%M:%SZ")


def _prepare_for_scaler(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()

    for c in ["open_time", "close_time", "ignore"]:
        if c in df.columns:
            df = df.drop(columns=[c])

    for c in df.columns:
        df[c] = pd.to_numeric(df[c], errors="coerce")

    if "close" in df.columns:
        close = df["close"].astype(float).values
    else:
        close = df.iloc[:, 0].astype(float).values
        df["close"] = close
    lr = np.zeros_like(close, dtype=np.float64)
    lr[1:] = np.log((close[1:] + EPS) / (close[:-1] + EPS))
    df["log_return"] = lr

    ema20 = df["close"].ewm(span=20, adjust=False, min_periods=5).mean()
    ema60 = df["close"].ewm(span=60, adjust=False, min_periods=10).mean()
    df["ema_diff"] = (ema20 - ema60) / (ema60.abs() + EPS)

    vol_roll = df["log_return"].rolling(60, min_periods=10).std()
    df["vol_roll"] = vol_roll

    df = df.replace([np.inf, -np.inf], np.nan).dropna().reset_index(drop=True)

    keep_cols = ["vol_roll", "ema_diff", "volume", "open", "low", "high"]
    for c in keep_cols:
        if c not in df.columns:
            raise KeyError(f"缺少必须特征列: {c}")
    df = df[keep_cols].values.astype(np.float32)
    return df


def _windows_view(data: np.ndarray, window: int) -> np.ndarray:
    N = data.shape[0] - window + 1
    xs: List[np.ndarray] = []
    for i in range(N):
        xs.append(data[i:i+window])
    return np.stack(xs, axis=0)


def pred_to_metrics(yhat_std: np.ndarray, y_mean: float, y_std: float, scale_y: float) -> np.ndarray:
    yhat_raw = yhat_std * y_std + y_mean
    sum_log_return = (yhat_raw / float(scale_y)) * math.sqrt(float(DEFAULT_HORIZON))
    expected_return = np.expm1(sum_log_return)
    return expected_return

def main():
    ap = argparse.ArgumentParser(description="Generate per-bar signals CSV for backtesting.")
    ap.add_argument("--csv", type=str, required=True, help="Input klines CSV.")
    ap.add_argument("--model", type=str, default="return/checkpoints/best_icp.pt", help="Trained model state_dict path.")
    ap.add_argument("--scaler", type=str, default="return/configs/scaler.joblib", help="StandardScaler path.")
    ap.add_argument("--label-stats", type=str, default="return/configs/label_stats.json", help="Label stats JSON path.")

    ap.add_argument("--hidden", type=int, default=256)
    ap.add_argument("--layers", type=int, default=2)

    ap.add_argument("--out", type=str, default="signals.csv", help="Output CSV path.")

    args = ap.parse_args()

    # Load artifacts
    if not os.path.exists(args.scaler):
        raise FileNotFoundError(f"Scaler not found: {args.scaler}")
    scaler = joblib.load(args.scaler)

    if not os.path.exists(args.label_stats):
        raise FileNotFoundError(f"Label stats not found: {args.label_stats}")
    with open(args.label_stats, "r") as f:
        label_stats = json.load(f)
    y_mean = float(label_stats.get("y_mean", 0.0))
    y_std  = float(label_stats.get("y_std", 1.0))
    scale_y = float(label_stats.get("scale", DEFAULT_SCALE_Y))

    # Load data
    raw = _read_raw(args.csv)
    time_series = _extract_time_col(raw)  # may be None
    df = _prepare_for_scaler(raw)
    data = df.astype(np.float32)
    data = scaler.transform(data).astype(np.float32)

    # windows
    T = DEFAULT_WINDOW
    N_total = data.shape[0]
    if N_total < T:
        raise ValueError(f"Not enough rows: need at least {T}, got {N_total}")

    # We'll predict for window ending at each bar t in [T-1 .. N_total-1]
    # And sub-sample by step for speed.
    idx_ends = np.arange(T-1, N_total, 1, dtype=np.int64)  # bar index of window end
    # Build batched tensor for inference
    results = []

    # Build model
    input_size = data.shape[1]
    model = GRURegressor(
        input_size=input_size,
        hidden_size=args.hidden,
        num_layers=args.layers,
        bidirectional=True,
        dropout=0.2,
        pooling="attn",
    ).to(DEVICE)

    # Safe weights-only load (PyTorch >= 2.5; retains compatibility with state_dict checkpoints)
    state = torch.load(args.model, map_location=DEVICE, weights_only=True)
    model.load_state_dict(state, strict=True)
    model.eval()

    batch = BATCH_SIZE
    with torch.no_grad():
        # Build windows on the fly to avoid huge memory on very long series
        for start in range(0, len(idx_ends), batch):
            ends_batch = idx_ends[start:start+batch]
            X = np.stack([data[e-T+1:e+1] for e in ends_batch], axis=0)  # [B, T, F]
            X_t = torch.from_numpy(X).to(DEVICE)
            yhat_std = model(X_t).detach().cpu().numpy().astype(np.float64)  # [B]

            expected_return = pred_to_metrics(
                yhat_std, y_mean, y_std, DEFAULT_SCALE_Y
            )

            # Collect rows
            for j, e in enumerate(ends_batch):
                idx = int(e)
                ts = ""  # default empty
                if time_series is not None and idx < len(time_series):
                    ts = str(time_series.iloc[idx])
                results.append({
                    "index": idx,
                    "time": ts,
                    "expected_return": float(expected_return[j]),
                })

    # Save to CSV
    out_dir = os.path.dirname(args.out)
    if out_dir:
        os.makedirs(out_dir, exist_ok=True)
    pd.DataFrame(results).to_csv(args.out, index=False, encoding="utf-8")
    print(f"Wrote {len(results)} rows to {args.out}")


if __name__ == "__main__":
    main()

虚拟货币价格预测

回测结果3-6月表现不错,拉长到一年结果就完蛋,但是grok评价是核弹级别的结果直接可以在金融市场抢钱

AI不会说的或者不懂金融的不知道的因为几个月区间货币价格涨幅不大导致误差很小模型一直在收敛以为学到东西了其实什么也没学到

总结:我支持金融市场完全随机假说!