/home/agent-jay/claudeCode/jarvis/finance/indicators/berg_combos.py

berg_combos.py — combo recipes (C1-C11) + precedent backtest

"""Milton Berg combo engine — wires atoms into the 11 named combos.

Each combo is a structured ``ComboSpec`` carrying:
  * Required atom detectors (already partial-applied with combo-specific params).
  * The logical instrument each atom targets (``spx`` / ``ndx`` / ``rut`` /
    ``sox`` / ``vix`` / ``nyse``).
  * Berg's named precedent dates for Gate G2 validation (±tolerance match).
  * Source quote and notes (including any blocked atoms).

A combo is "blocked" when one or more required atoms need external feeds
(NDR oscillator, NYSE upside/downside volume, IPO calendar, ...) that
are not yet wired up in research.db.  Blocked combos surface cleanly
in ``evaluate_all_combos`` so Gate G2 verdicts only reference combos
that are actually live.

PUBLIC API
----------
* ``BERG_COMBO_REGISTRY`` — dict mapping combo_id -> ComboSpec.
* ``evaluate_combo(combo_id, panel)`` — bool Series of fire dates.
* ``evaluate_all_combos(panel)`` — long-form DataFrame of all live combos.
* ``backtest_precedents(combo_id, panel, tolerance_days=2)`` — Gate G2 helper.

Data contract for ``panel``:
  ``Mapping[str, pd.DataFrame]`` keyed by logical instrument id; each frame
  has a DatetimeIndex and lowercase OHLCV columns (subset per atom needs).

Year-of-precedent note: Berg's May 2026 interview was framed as "2025-04
low signals", but the catalog transcription lists C6/C7/C8/C10 with
"2026" dates. We treat 2025 as the primary precedent year (matching the
interview framing) and surface 2026 as alt for diagnostic comparison.
"""
from __future__ import annotations

from dataclasses import dataclass, field
from functools import partial
from typing import Callable, Mapping

import pandas as pd

from finance.indicators.berg_atoms import (
    b6_new_n_day_low,
    b7_new_n_day_closing_high,
    p1_3d_decline_at_least_7pct,
    p2_1d_gain_is_250d_max,
    p3_10d_gain_is_180d_max,
    p4_10d_roc_is_180d_max,
    p5_10d_roc_at_least_20pct,
    p6_drawdown_from_252d_high,
    p6_drawdown_reached_within,
    p7_recovery_thrust,
    s5_up_n_of_m_days,
    s6_up_16_of_19_days,
    t1_below_250d_sma,
    t2_held_low_for_at_least,
    v1_5d_volume_at_200d_high,
    v2_5d_avg_volume_375d_high_within_20d,
    v5_signal_day_volume_up_20pct,
    vx1_vix_in_range,
    vx2_vix_5d_20d_stdev_ratio,
)


# ============================================================================
# Instrument keys
# ============================================================================
INSTR_SPX = "spx"
INSTR_NDX = "ndx"
INSTR_RUT = "rut"
INSTR_SOX = "sox"
INSTR_VIX = "vix"
INSTR_NYSE = "nyse"


# ============================================================================
# Spec types
# ============================================================================

@dataclass(frozen=True)
class AtomReq:
    """One atom requirement within a combo.

    ``detector`` is the atom function with combo-specific params already
    bound via ``functools.partial``. Calling ``detector(df)`` returns a
    bool ``pd.Series`` aligned to ``df.index``.
    """
    label: str
    atom_id: str
    instrument: str
    detector: Callable[[pd.DataFrame], pd.Series]


@dataclass(frozen=True)
class ComboSpec:
    """Berg combo: AND-reduction of N atoms across instruments."""
    id: str
    name: str
    rationale: str
    atoms: tuple[AtomReq, ...]
    precedent_dates: tuple[str, ...]
    source_quote: str
    notes: str = ""
    blocked: bool = False
    blocked_reason: str = ""
    alt_precedent_dates: tuple[str, ...] = field(default_factory=tuple)


# ============================================================================
# Combo registry — 11 combos
# ============================================================================

BERG_COMBO_REGISTRY: dict[str, ComboSpec] = {
    "C1": ComboSpec(
        id="C1",
        name="1957-style capitulation turn",
        rationale=(
            "Capitulation low: 5d-avg NYSE volume regime + 60d new low "
            "+ 10%+ drawdown + extreme downside volume + reverse breadth thrust."
        ),
        atoms=(
            AtomReq("V2_375d_within_20d", "V2", INSTR_NYSE,
                    v2_5d_avg_volume_375d_high_within_20d),
            AtomReq("B6@60d_low", "B6", INSTR_SPX,
                    partial(b6_new_n_day_low, n=60)),
            AtomReq("P6@-10%", "P6", INSTR_SPX,
                    partial(p6_drawdown_from_252d_high, threshold=-0.10)),
        ),
        precedent_dates=("1957-10-21", "1962-06-26", "1976-09-22",
                         "2018-12-24", "2025-04-08"),
        source_quote="Combo C1 — Berg/Shapiro interview 2026-05-18",
        notes="V4 (NYSE 5d upside vol < 45%) and B3 (reverse breadth thrust) not implemented.",
        blocked=True,
        blocked_reason="V4 + B3 need NYSE upside/downside volume + A/D thrust feed.",
    ),
    "C2": ComboSpec(
        id="C2",
        name="April 4 2025 NDX panic + VIX 45-60",
        rationale="NDX 5d-vol 200d-high + 3d Nasdaq crash ≥7% + VIX intraday range 45-60.",
        atoms=(
            AtomReq("V1_NDX", "V1", INSTR_NDX, v1_5d_volume_at_200d_high),
            AtomReq("P1@-7%_NDX", "P1", INSTR_NDX, p1_3d_decline_at_least_7pct),
            AtomReq("VX1[45-60]", "VX1", INSTR_VIX, vx1_vix_in_range),
        ),
        precedent_dates=("1998-08-31", "2011-08-08", "2025-04-04"),
        source_quote="Combo C2 — Berg",
        notes="1998/2011 dates approximated to nearest panic close; Berg said 'precedents 1998, 2011'.",
    ),
    "C3": ComboSpec(
        id="C3",
        name="April 7 2025 NYSE vol + oscillator + breadth lows",
        rationale="V2 NYSE vol + NDR oscillator <10 + Nasdaq 5d (new lows − new highs) ≤ −16.",
        atoms=(
            AtomReq("V2_375d_within_20d", "V2", INSTR_NYSE,
                    v2_5d_avg_volume_375d_high_within_20d),
        ),
        precedent_dates=("1998-08-31", "2018-12-24", "2020-03-23", "2025-04-07"),
        source_quote="Combo C3 — Berg",
        notes="SE4 (NDR oscillator) + B4 (NASDAQ 5d new-lows minus new-highs) not implemented.",
        blocked=True,
        blocked_reason="SE4 + B4 atoms not implemented (need NDR oscillator + Nasdaq new-highs/lows feed).",
    ),
    "C4": ComboSpec(
        id="C4",
        name="April 9 2025 thrust day",
        rationale=(
            "Downtrend regime (close <250DMA) + ≥6% recovery from prior 10% decline "
            "into a new recovery high + biggest 1d gain in 250d + day-volume ≥+20%."
        ),
        atoms=(
            AtomReq("T1<250DMA", "T1", INSTR_SPX, t1_below_250d_sma),
            AtomReq("P7_6%_from_-10%", "P7", INSTR_SPX,
                    partial(p7_recovery_thrust, prior_drop_pct=-0.10, recovery_pct=0.06)),
            AtomReq("P2_250d_max_gain", "P2", INSTR_SPX, p2_1d_gain_is_250d_max),
            AtomReq("V5_vol+20%", "V5", INSTR_SPX, v5_signal_day_volume_up_20pct),
        ),
        precedent_dates=("1979-11-01", "1982-08-17", "1984-08-02",
                         "2003-03-17", "2003-10-15", "2025-04-09"),
        source_quote="Combo C4 — Berg",
        notes="1979/1982/1984 outside SPY history (starts 2010); 2003 dates need pre-history.",
    ),
    "C5": ComboSpec(
        id="C5",
        name="April 30 2025 held-low-7-days + recovery thrust",
        rationale="S&P down ≥5% but held low 7 days + ≥9% recovery into new high + NDR Multi-Cap A/D ≥2.10.",
        atoms=(
            AtomReq("T2_held@7d", "T2", INSTR_SPX,
                    partial(t2_held_low_for_at_least, n_days=7)),
            AtomReq("P7_9%_from_-5%", "P7", INSTR_SPX,
                    partial(p7_recovery_thrust, prior_drop_pct=-0.05, recovery_pct=0.09)),
        ),
        precedent_dates=("1974-10-04", "1998-10-15", "2002-10-15",
                         "2009-03-17", "2011-10-04", "2025-04-30"),
        source_quote="Combo C5 — Berg",
        notes="B1 (NDR Multi-Cap 7d A/D ≥2.10) not implemented — partial combo evaluation.",
        blocked=True,
        blocked_reason="B1 (NDR Multi-Cap 7-day A/D ratio) requires composite breadth feed.",
    ),
    "C6": ComboSpec(
        id="C6",
        name="April 2025 held-low-9 + 4-way thrust",
        rationale=(
            "S&P down 9% held low 9 days + new 30d recovery high + Nasdaq biggest "
            "10d gain in 180d + S&P biggest 10d ROC in 180d. Berg's strongest combo "
            "('100% of time up 120 days later across precedents')."
        ),
        atoms=(
            AtomReq("T2_held@9d", "T2", INSTR_SPX,
                    partial(t2_held_low_for_at_least, n_days=9)),
            AtomReq("B7@30d_high", "B7", INSTR_SPX,
                    partial(b7_new_n_day_closing_high, n=30)),
            AtomReq("P3_NDX_10d_180d_max", "P3", INSTR_NDX, p3_10d_gain_is_180d_max),
            AtomReq("P4_SPX_10d_180d_max", "P4", INSTR_SPX, p4_10d_roc_is_180d_max),
        ),
        precedent_dates=("1982-08-25", "1984-08-06", "1998-10-21",
                         "2011-10-14", "2026-04-13"),
        source_quote="Combo C6 — Berg",
        notes="Catalog lists '2026-04-13' as most recent firing; verified — combo fires "
              "on 2026-04-13 and 2026-04-14 in our data.",
    ),
    "C7": ComboSpec(
        id="C7",
        name="C6 + Russell 8-of-9 confirmation",
        rationale="C6 conditions plus Russell up 8 of 9 days (broadening breadth).",
        atoms=(
            AtomReq("T2_held@9d", "T2", INSTR_SPX,
                    partial(t2_held_low_for_at_least, n_days=9)),
            AtomReq("B7@30d_high", "B7", INSTR_SPX,
                    partial(b7_new_n_day_closing_high, n=30)),
            AtomReq("P3_NDX_10d_180d_max", "P3", INSTR_NDX, p3_10d_gain_is_180d_max),
            AtomReq("P4_SPX_10d_180d_max", "P4", INSTR_SPX, p4_10d_roc_is_180d_max),
            AtomReq("S5_RUT_8of9", "S5", INSTR_RUT,
                    partial(s5_up_n_of_m_days, n=8, m=9)),
        ),
        precedent_dates=("1982-08-23", "1998-10-21", "2026-04-13"),
        source_quote="Combo C7 — Berg",
        notes="Russell history < 252d in research.db (IWM stub only). Blocked until P0 backfill.",
        blocked=True,
        blocked_reason="Russell (IWM/RUT) history insufficient in research.db.",
    ),
    "C8": ComboSpec(
        id="C8",
        name="Oct 14 2011 NASDAQ volume thrust",
        rationale="S&P down 13% held low 9 days + Nasdaq 10d up/down vol ≥1.89 + Nasdaq 10d A/D ≥1.30.",
        atoms=(
            AtomReq("T2_held@9d", "T2", INSTR_SPX,
                    partial(t2_held_low_for_at_least, n_days=9)),
            AtomReq("P6@-13%", "P6", INSTR_SPX,
                    partial(p6_drawdown_from_252d_high, threshold=-0.13)),
        ),
        precedent_dates=("2011-10-14",),
        source_quote="Combo C8 — Berg",
        notes="V3 (Nasdaq 10d up/down vol thrust) + B2 (Nasdaq 10d A/D) not implemented.",
        blocked=True,
        blocked_reason="V3 + B2 atoms not implemented (need NASDAQ composite up/down volume + A/D feeds).",
    ),
    "C9": ComboSpec(
        id="C9",
        name="SOX parabolic at 2y SPX high",
        rationale=(
            "SOX 10d ROC ≥+20% + S&P at 2y closing high. Berg: 'never been bearish, "
            "continuation only' (1987-10-04 ran +28% before crash; 1997-05-05)."
        ),
        atoms=(
            AtomReq("P5_SOX_10d_20%", "P5", INSTR_SOX,
                    partial(p5_10d_roc_at_least_20pct, threshold=0.20)),
            AtomReq("B7_SPX_2y_high", "B7", INSTR_SPX,
                    partial(b7_new_n_day_closing_high, n=504)),
        ),
        precedent_dates=("1987-10-04", "1997-05-05"),
        source_quote="Combo C9 — Berg",
        notes="SOX (SOXX/SMH) history not in research.db — blocked until P0 backfill.",
        blocked=True,
        blocked_reason="No SOX / SOXX / SMH price history in research.db.",
    ),
    "C10": ComboSpec(
        id="C10",
        name="Late entry — held low 19d + Nasdaq 16-of-19",
        rationale=(
            "Nasdaq down 10% held low 19 days + Nasdaq up 16 of those 19. "
            "Continuation signal; historical max-DD post-signal ~0.2%."
        ),
        atoms=(
            # P6 historical: drawdown reached -10% at some point in past 60 days
            # (not necessarily today — Berg's "declined 10%" is regime, not instant).
            AtomReq("P6_HIST@-10%_60d_NDX", "P6_HIST", INSTR_NDX,
                    partial(p6_drawdown_reached_within, threshold=-0.10, lookback_days=60)),
            AtomReq("T2_held@19d_NDX", "T2", INSTR_NDX,
                    partial(t2_held_low_for_at_least, n_days=19)),
            AtomReq("S6_NDX_16of19", "S6", INSTR_NDX, s6_up_16_of_19_days),
        ),
        precedent_dates=("1985-11-04", "1986-10-30", "2023-11-22", "2026-04-27"),
        source_quote="Combo C10 — Berg",
        notes="1985/86 dates outside QQQ history; 2023 + 2026 testable.",
    ),
    "C11": ComboSpec(
        id="C11",
        name="IPO peak + VIX compression",
        rationale=(
            "4y high in IPO $ value + VIX 5d/20d stdev ratio < 0.87. Berg cites "
            "this to refute Twitter 'bearish' takes — historically not a sell signal."
        ),
        atoms=(
            AtomReq("VX2", "VX2", INSTR_VIX, vx2_vix_5d_20d_stdev_ratio),
        ),
        precedent_dates=("2014-11-21", "2020-05-29", "2020-06-05", "2026-05-15"),
        source_quote="Combo C11 — Berg",
        notes="SE1 (4y IPO $ value high) requires IPO calendar feed.",
        blocked=True,
        blocked_reason="SE1 (4y IPO $ value high) atom not implemented.",
    ),
}


# ============================================================================
# Public helpers
# ============================================================================

def list_combos(include_blocked: bool = True) -> list[ComboSpec]:
    """Return combos sorted by id; optionally filter out blocked ones."""
    out = sorted(BERG_COMBO_REGISTRY.values(), key=lambda s: s.id)
    if include_blocked:
        return out
    return [c for c in out if not c.blocked]


def evaluate_combo(
    combo_id: str, panel: Mapping[str, pd.DataFrame]
) -> pd.Series:
    """AND-reduce a combo's atom outputs into a bool Series of fire dates.

    Args:
        combo_id: registry key (e.g. ``"C2"``).
        panel: mapping of logical instrument id -> OHLCV DataFrame.

    Returns:
        bool ``pd.Series`` named ``combo_id``, indexed by the inner-joined
        date index of all participating instruments. True = combo fired.

    Raises:
        ValueError: if the combo is blocked (missing-data combo).
        KeyError: if a required instrument is not in ``panel``.
    """
    spec = BERG_COMBO_REGISTRY[combo_id]
    if spec.blocked:
        raise ValueError(
            f"Combo {combo_id} is blocked: {spec.blocked_reason}"
        )
    results: list[pd.Series] = []
    for req in spec.atoms:
        if req.instrument not in panel:
            raise KeyError(
                f"Combo {combo_id} requires instrument '{req.instrument}' "
                f"(missing from panel; have: {sorted(panel.keys())})"
            )
        s = req.detector(panel[req.instrument]).astype(bool)
        results.append(s.rename(req.label))
    aligned = pd.concat(results, axis=1, join="inner").fillna(False)
    return aligned.all(axis=1).rename(combo_id)


def evaluate_all_combos(
    panel: Mapping[str, pd.DataFrame],
    include_blocked: bool = False,
) -> pd.DataFrame:
    """Run every (unblocked) combo against ``panel``.

    Returns long-form DataFrame with columns ``date``, ``combo_id``, ``fired``.
    Blocked combos and combos missing instruments are skipped silently
    (their status is queryable via ``BERG_COMBO_REGISTRY``).
    """
    rows: list[dict] = []
    for combo_id, spec in BERG_COMBO_REGISTRY.items():
        if spec.blocked and not include_blocked:
            continue
        try:
            fired = evaluate_combo(combo_id, panel)
        except (KeyError, ValueError):
            continue
        for d in fired.index[fired.values]:
            rows.append({"date": d, "combo_id": combo_id, "fired": True})
    return pd.DataFrame(rows, columns=["date", "combo_id", "fired"])


def backtest_precedents(
    combo_id: str,
    panel: Mapping[str, pd.DataFrame],
    tolerance_days: int = 2,
    include_alt: bool = False,
) -> list[dict]:
    """Gate G2: for each named precedent, did the combo fire within ±tolerance?

    Args:
        combo_id: registry key.
        panel: instrument panel.
        tolerance_days: calendar-day window around each precedent date.
        include_alt: include ``alt_precedent_dates`` (transcription alternates).

    Returns list of dicts with keys ``date``, ``status``, ``match_dates``:
      * ``MATCH``   — combo fired within ±tolerance_days of precedent.
      * ``MISS``    — combo did not fire near precedent (in-range, no match).
      * ``OUT_OF_RANGE`` — precedent date outside available history.
    """
    spec = BERG_COMBO_REGISTRY[combo_id]
    fired = evaluate_combo(combo_id, panel)
    if fired.empty:
        return [
            {"date": d, "status": "NO_DATA", "match_dates": []}
            for d in spec.precedent_dates
        ]
    fire_dates = fired.index[fired.values]
    earliest, latest = fired.index.min(), fired.index.max()
    tol = pd.Timedelta(days=tolerance_days)

    candidates = list(spec.precedent_dates)
    if include_alt:
        candidates.extend(spec.alt_precedent_dates)

    out: list[dict] = []
    for ds in candidates:
        target = pd.Timestamp(ds)
        if target < earliest or target > latest:
            out.append({"date": ds, "status": "OUT_OF_RANGE", "match_dates": []})
            continue
        in_window = fire_dates[(fire_dates >= target - tol) & (fire_dates <= target + tol)]
        match_strs = [d.strftime("%Y-%m-%d") for d in in_window]
        out.append({
            "date": ds,
            "status": "MATCH" if match_strs else "MISS",
            "match_dates": match_strs,
        })
    return out


def precedent_summary(
    panel: Mapping[str, pd.DataFrame],
    tolerance_days: int = 2,
) -> pd.DataFrame:
    """Tabular Gate G2 summary across all live combos.

    Columns: combo_id, total_precedents, in_range, matches, hit_rate, status.
    """
    rows = []
    for combo_id, spec in BERG_COMBO_REGISTRY.items():
        if spec.blocked:
            rows.append({
                "combo_id": combo_id,
                "total_precedents": len(spec.precedent_dates),
                "in_range": 0,
                "matches": 0,
                "hit_rate": float("nan"),
                "status": f"BLOCKED: {spec.blocked_reason}",
            })
            continue
        try:
            results = backtest_precedents(combo_id, panel, tolerance_days)
        except (KeyError, ValueError) as e:
            rows.append({
                "combo_id": combo_id,
                "total_precedents": len(spec.precedent_dates),
                "in_range": 0,
                "matches": 0,
                "hit_rate": float("nan"),
                "status": f"ERROR: {e}",
            })
            continue
        in_range = sum(1 for r in results if r["status"] != "OUT_OF_RANGE")
        matches = sum(1 for r in results if r["status"] == "MATCH")
        hit_rate = matches / in_range if in_range > 0 else float("nan")
        rows.append({
            "combo_id": combo_id,
            "total_precedents": len(spec.precedent_dates),
            "in_range": in_range,
            "matches": matches,
            "hit_rate": hit_rate,
            "status": "LIVE",
        })
    return pd.DataFrame(rows)