/home/agent-jay/claudeCode/jarvis/finance/indicators/berg_atoms.py

berg_atoms.py — atomic indicator source

"""Milton Berg atomic indicators — histogram-tail extreme detectors.

Source: Milton Berg interview on Crowded Market Report (Jason Shapiro),
YouTube 68bnJkSZCzk, recorded 2026-05-18.

Methodology (Berg, paraphrased):
  - Daily market behavior is random in the middle of the distribution; ignore it.
  - Signals emerge only at extreme tails of an indicator's historical histogram.
  - A tradeable signal = 3-4 atoms firing on the same bar (a "combo").
  - Berg's full library: ~30,000 raw indicators, ~2,000 tradeable combos.
  - This module implements the ~17 atoms named explicitly in the May 2026 interview.

Each atom returns a boolean pd.Series aligned to the input DataFrame's index.
True = the extreme condition is satisfied on that bar.

CONFIDENCE LEVELS (see AtomSpec.confidence per atom)
  HIGH       (0.85-1.00) — formula unambiguous in source; pure math; reproducible.
  MEDIUM     (0.50-0.84) — formula reconstructed from Berg's verbal description.
  LOW        (0.30-0.49) — threshold inferred; needs validation against Berg's cited dates.
  SPECULATIVE (<0.30)   — substitute for proprietary Berg data (NDR, Market Vane).

Data contract:
  Input:  pd.DataFrame indexed by DatetimeIndex (daily), lowercase OHLCV columns.
          Required columns vary by atom — see AtomSpec.required_columns.
  Output: pd.Series[bool] aligned to input.index. NaN inputs propagate as False.

Numerical stability:
  - Insufficient-history rows return False (not NaN, not exception).
  - Division-by-zero guarded; missing-data tolerant.
  - Vectorized via pandas/numpy — no Python loops over time.
"""
from __future__ import annotations

from dataclasses import dataclass
from typing import Callable, Literal, Mapping

import numpy as np
import pandas as pd


# ============================================================================
# Registry types
# ============================================================================

ConfidenceLevel = Literal["HIGH", "MEDIUM", "LOW", "SPECULATIVE"]
Category = Literal["volume", "momentum", "streak", "breadth", "volatility", "sentiment", "trend"]


@dataclass(frozen=True)
class AtomSpec:
    """Structured metadata for one Berg atom."""
    id: str                              # canonical ID (V1, P3, S5, ...)
    name: str                            # short human name
    category: Category
    confidence: float                    # 0.0 - 1.0
    confidence_level: ConfidenceLevel
    confidence_rationale: str            # why we have this confidence
    source_quote: str                    # verbatim from Berg interview
    required_columns: tuple[str, ...]    # which OHLCV cols the func needs
    func: Callable[..., pd.Series]       # the detector


# ============================================================================
# Utilities (private)
# ============================================================================

def _require_columns(df: pd.DataFrame, cols: tuple[str, ...]) -> None:
    """Raise ValueError if any column is missing from df."""
    missing = [c for c in cols if c not in df.columns]
    if missing:
        raise ValueError(f"DataFrame missing required columns: {missing}")


def _bool_series(values: pd.Series | np.ndarray, index: pd.Index) -> pd.Series:
    """Return a clean boolean Series with no NaN (NaN→False)."""
    s = pd.Series(values, index=index)
    return s.fillna(False).astype(bool)


def _run_length(condition: pd.Series) -> pd.Series:
    """Length of the current True-run ending at each row.

    Example: [F,T,T,F,T,T,T] -> [0,1,2,0,1,2,3]
    """
    grp = (condition != condition.shift()).cumsum()
    return condition.groupby(grp).cumsum().astype(int)


# ============================================================================
# Volume atoms (V*)
# ============================================================================

def v1_5d_volume_at_200d_high(prices: pd.DataFrame) -> pd.Series:
    """V1: 5-day rolling volume is at its trailing 200-day maximum.

    Berg context: "the NDX generated its highest 5-day volume in 200 days."
    Used at panic lows — extreme volume = capitulation OR thrust.

    Confidence: HIGH (0.95). Pure rolling-max calculation, formula unambiguous.
    """
    _require_columns(prices, ("volume",))
    v5 = prices["volume"].rolling(5, min_periods=5).sum()
    v5_max200 = v5.rolling(200, min_periods=200).max()
    return _bool_series(v5 == v5_max200, prices.index)


def v2_5d_avg_volume_375d_high_within_20d(prices: pd.DataFrame) -> pd.Series:
    """V2: 5-day average volume hit a 375-day high anytime in the past 20 days.

    Berg context: "5-day average New York Stock Exchange volume was the highest
    in 375 days ... we consider that a high volume regime, which occurs at a
    turning point. We're not necessarily looking for high volume on the day
    of the low ... as long as there's high 5-day volume greatest in 1.5 years
    within the past 20 days, we consider that a high volume regime."

    Confidence: HIGH (0.90). Two-stage rolling-max; Berg's description is precise.
    """
    _require_columns(prices, ("volume",))
    v5avg = prices["volume"].rolling(5, min_periods=5).mean()
    v5avg_max375 = v5avg.rolling(375, min_periods=375).max()
    is_375d_high = (v5avg == v5avg_max375)
    # "Within past 20 days" = at least one True in trailing 20-row window
    within_20 = is_375d_high.rolling(20, min_periods=1).max().astype(bool)
    return _bool_series(within_20, prices.index)


def v5_signal_day_volume_up_20pct(prices: pd.DataFrame) -> pd.Series:
    """V5: Up day with volume ≥ +20% vs prior day.

    Berg context: "S&P was up on the day and volume increased by 20%."

    Confidence: HIGH (0.95). Direct verbal threshold.
    """
    _require_columns(prices, ("close", "volume"))
    up_day = prices["close"] > prices["close"].shift(1)
    vol_up_20 = prices["volume"] >= 1.20 * prices["volume"].shift(1)
    return _bool_series(up_day & vol_up_20, prices.index)


# ============================================================================
# Price-momentum atoms (P*)
# ============================================================================

def p1_3d_decline_at_least_7pct(prices: pd.DataFrame, threshold: float = -0.07) -> pd.Series:
    """P1: 3-day rate-of-change ≤ -7% (panic decline).

    Berg context: "the Nasdaq declined 7% in 3 days ... at the tail end of a
    bear market ... you start seeing excessive momentum ... negative momentum
    into a low is a positive."

    Confidence: HIGH (0.95). Simple pct_change with explicit threshold.
    """
    _require_columns(prices, ("close",))
    roc3 = prices["close"].pct_change(3)
    return _bool_series(roc3 <= threshold, prices.index)


def p2_1d_gain_is_250d_max(prices: pd.DataFrame) -> pd.Series:
    """P2: today's 1-day pct change ranks #1 in the trailing 250 trading days.

    Berg context: "the S&P had its highest one-day gain in 250 days,
    a little over a year of trading."

    Confidence: HIGH (0.95). Rolling-max rank on pct_change.
    """
    _require_columns(prices, ("close",))
    chg1 = prices["close"].pct_change()
    return _bool_series(chg1 == chg1.rolling(250, min_periods=250).max(), prices.index)


def p3_10d_gain_is_180d_max(prices: pd.DataFrame) -> pd.Series:
    """P3: 10-day pct change ranks #1 in the trailing 180 trading days.

    Berg context: "Nasdaq generated its greatest 10-day gain in 180 trading days."

    Confidence: HIGH (0.95).
    """
    _require_columns(prices, ("close",))
    chg10 = prices["close"].pct_change(10)
    return _bool_series(chg10 == chg10.rolling(180, min_periods=180).max(), prices.index)


def p4_10d_roc_is_180d_max(prices: pd.DataFrame) -> pd.Series:
    """P4: 10-day rate-of-change is at its 180-day high.

    Berg gave this as a separate condition from P3 in combo C6 ("greatest
    10-day gain in 180 days" PLUS "greatest 10-day rate-of-change in 180 days").
    Berg's distinction is unclear — possibly two indices (Nasdaq vs S&P).

    Confidence: MEDIUM (0.65). Same math as P3 but Berg cited as distinct atom;
    treat as P3 applied to a second instrument (caller supplies which).
    """
    return p3_10d_gain_is_180d_max(prices)


def p5_10d_roc_at_least_20pct(prices: pd.DataFrame, threshold: float = 0.20) -> pd.Series:
    """P5: 10-day rate-of-change ≥ +20% (typically SOX).

    Berg context: "the SOX 10-day rate of change was above 20%."

    Confidence: HIGH (0.95).
    """
    _require_columns(prices, ("close",))
    roc10 = prices["close"].pct_change(10)
    return _bool_series(roc10 >= threshold, prices.index)


def p6_drawdown_from_252d_high(
    prices: pd.DataFrame, threshold: float = -0.10
) -> pd.Series:
    """P6: max drawdown from trailing 252-day high ≤ threshold (default -10%).

    Berg's variants: -5%, -9%, -10%, -12%, -13%. Parameterized so combos can
    use any depth.

    Confidence: HIGH (0.90). Standard drawdown computation; Berg gave multiple
    thresholds across different combos.
    """
    _require_columns(prices, ("close",))
    high_252 = prices["close"].rolling(252, min_periods=20).max()
    dd = prices["close"] / high_252 - 1.0
    return _bool_series(dd <= threshold, prices.index)


def p6_drawdown_reached_within(
    prices: pd.DataFrame,
    threshold: float = -0.10,
    lookback_days: int = 60,
) -> pd.Series:
    """P6 (historical variant): drawdown from 252d high REACHED ≤threshold at
    any point during the trailing ``lookback_days`` window.

    Distinct from ``p6_drawdown_from_252d_high`` (current-bar reading): some
    Berg combos (C5, C6, C8, C10) describe "the market DECLINED X%, held the
    low for Y days, then thrusted" — the drawdown is a historical regime
    condition, not a same-bar condition. By the time the thrust/recovery
    fires, current drawdown has shrunk.

    Args:
        prices: OHLCV with 'close' column.
        threshold: drawdown depth (default -0.10). Use -0.05, -0.13 etc. per combo.
        lookback_days: trailing TRADING-day window in which the DD must have occurred.

    Confidence: HIGH (0.90). The historical-anywhere semantics match Berg's
    verbal usage in C5/C6/C10 ("the market declined ...").
    """
    _require_columns(prices, ("close",))
    high_252 = prices["close"].rolling(252, min_periods=20).max()
    dd = prices["close"] / high_252 - 1.0
    reached = (dd <= threshold).rolling(lookback_days, min_periods=1).max().astype(bool)
    return _bool_series(reached, prices.index)


def p7_recovery_thrust(
    prices: pd.DataFrame,
    prior_drop_pct: float = -0.10,
    recovery_pct: float = 0.06,
    lookback: int = 60,
) -> pd.Series:
    """P7: market gained ≥ recovery_pct from its latest swing low (after a
    prior_drop_pct decline) INTO a new recovery high (highest close since the low).

    Berg context: "the S&P gained 6% from its last 10% decline into a new
    recovery high."

    Decomposition (per bar):
      1. Identify the lowest close in the last `lookback` days → `swing_low`.
      2. Confirm `swing_low` followed a drop of ≥ |prior_drop_pct| from its
         own trailing `lookback`-day high.
      3. Today's close is the highest close since `swing_low` (new recovery high).
      4. Today's close ≥ (1 + recovery_pct) × `swing_low`.

    Confidence: MEDIUM (0.70). Formula is reconstructed; Berg's "from its last
    10% decline" wording is interpretable.

    Implementation: per-bar loop, O(n * lookback). Acceptable for daily data
    (~25k bars × 60 = 1.5M ops). Vectorize later if it becomes a hotspot.
    """
    _require_columns(prices, ("close",))
    close = prices["close"].to_numpy()
    n = len(close)
    out = np.zeros(n, dtype=bool)
    min_history = 10  # need at least this many bars to identify a swing low

    for i in range(min_history, n):
        start = max(0, i - lookback + 1)
        window = close[start: i + 1]
        low_pos = int(np.argmin(window))         # index within window
        low = float(window[low_pos])
        # Closes BEFORE the low, in the same window:
        before_low = window[:low_pos + 1]
        high_before = float(before_low.max())
        if high_before <= 0:
            continue
        # Step 2: drawdown depth check
        if (low / high_before - 1.0) > prior_drop_pct:
            continue
        # Step 3: today is the highest close since the low (inclusive of low day)
        since_low = window[low_pos:]
        if close[i] < since_low.max() - 1e-12:
            continue
        # Step 4: recovery magnitude
        if close[i] < low * (1.0 + recovery_pct):
            continue
        out[i] = True

    return pd.Series(out, index=prices.index)


def p8_single_day_gain_at_least(
    prices: pd.DataFrame, threshold: float = 0.09
) -> pd.Series:
    """P8: single-day gain ≥ +9% (capitulation reversal / thrust day).

    Berg context: "both the Nasdaq and the S&P gained more than 9% on the day."

    Confidence: HIGH (0.98).
    """
    _require_columns(prices, ("close",))
    chg = prices["close"].pct_change()
    return _bool_series(chg >= threshold, prices.index)


def p_roc74_5d_bull_trigger(
    prices: pd.DataFrame, threshold: float = 0.074
) -> pd.Series:
    """P_ROC74: 5-day rate-of-change ≥ +7.4% (Berg's "every bull since 1928" trigger).

    Source: Berg on Forward Guidance (Feb 2024) — "Every bull market since 1928
    has begun with a 5-day move of at least 7.4% on the S&P 500."

    Confidence: HIGH (0.90). Threshold is verbatim; ROC math unambiguous.
    Note: this is a necessary-not-sufficient condition — many such moves occur
    inside bear-market rallies. Use as filter, not standalone signal.
    """
    _require_columns(prices, ("close",))
    roc5 = prices["close"].pct_change(5)
    return _bool_series(roc5 >= threshold, prices.index)


# ============================================================================
# Streak atoms (S*)
# ============================================================================

def s1_consecutive_up_days(prices: pd.DataFrame, n: int = 10) -> pd.Series:
    """S1/S3: ≥ n consecutive up closes (default 10).

    Berg context (VIX): "to be up 10 days in a row was 0.01% of the time"
    Berg context (S&P): "you've seen 9 up days in a row, 11 up days in a row."

    Confidence: HIGH (0.98).
    """
    _require_columns(prices, ("close",))
    up = prices["close"] > prices["close"].shift(1)
    return _bool_series(_run_length(up) >= n, prices.index)


def s2_consecutive_down_days(prices: pd.DataFrame, n: int = 10) -> pd.Series:
    """S2/S4: ≥ n consecutive down closes (default 10).

    Berg context: "to be down 10 days in a row happened 0.04% of the time."

    Confidence: HIGH (0.98).
    """
    _require_columns(prices, ("close",))
    down = prices["close"] < prices["close"].shift(1)
    return _bool_series(_run_length(down) >= n, prices.index)


def s5_up_n_of_m_days(
    prices: pd.DataFrame, n: int = 8, m: int = 9
) -> pd.Series:
    """S5: at least n up days within the last m days (default 8 of 9).

    Berg context: "the Russell was up 8 out of 9 days."

    Confidence: HIGH (0.95).
    """
    _require_columns(prices, ("close",))
    up = (prices["close"] > prices["close"].shift(1)).astype(int)
    up_count = up.rolling(m, min_periods=m).sum()
    return _bool_series(up_count >= n, prices.index)


def s6_up_16_of_19_days(prices: pd.DataFrame) -> pd.Series:
    """S6: at least 16 up days in the last 19 (special case for combo C10).

    Berg context: "Nasdaq declined 10%, held its low for 19 days, and the
    Nasdaq was up 16 out of those 19 days."

    Confidence: HIGH (0.95). Convenience wrapper around s5_up_n_of_m_days.
    """
    return s5_up_n_of_m_days(prices, n=16, m=19)


# ============================================================================
# Volatility atoms (VX*)
# ============================================================================

def vx1_vix_in_range(
    vix: pd.DataFrame, low_bound: float = 45.0, high_bound: float = 60.0
) -> pd.Series:
    """VX1: VIX traded inside [low_bound, high_bound] on the day.

    Berg context: "the VIX traded between 45 and 60. Anyone who knows history
    of the VIX, that's a very high spread."

    Two interpretations:
      (a) intraday range straddled the band (low <= high_bound AND high >= low_bound)
      (b) close was inside the band
    We implement (a) — the looser inclusive intra-day reading per Berg's
    description.

    Confidence: MEDIUM (0.70). Berg's exact criterion ("between 45 and 60")
    could mean closing range, intraday range, or specific high-low. We pick
    intraday range as the most literal reading.
    """
    _require_columns(vix, ("high", "low"))
    inside = (vix["low"] <= high_bound) & (vix["high"] >= low_bound)
    return _bool_series(inside, vix.index)


def vx_vix_5d_30d_avg_ratio(
    vix: pd.DataFrame,
    compression_threshold: float = 0.85,
    expansion_threshold: float = 1.50,
    mode: Literal["compression", "expansion"] = "compression",
) -> pd.Series:
    """VX_VIX_5_30: VIX 5-day mean / 30-day mean — compression or expansion extreme.

    Source: Berg X posts 2025-2026 — uses this as a regime-extreme reading
    distinct from VX2 (which is stdev-based, not mean-based).

    mode='compression' → ratio < compression_threshold (0.85 default) — extreme
        VIX collapse; sometimes precedes tops, sometimes precedes continuation.
    mode='expansion' → ratio > expansion_threshold (1.50 default) — VIX surging;
        capitulation regime, often near bottoms.

    Confidence: MEDIUM (0.70). Direction and rough thresholds match Berg's
    cited language; exact cutoffs reconstructed from screenshotted charts.
    """
    _require_columns(vix, ("close",))
    mean5 = vix["close"].rolling(5, min_periods=5).mean()
    mean30 = vix["close"].rolling(30, min_periods=30).mean()
    ratio = mean5 / mean30.replace(0, np.nan)
    if mode == "compression":
        return _bool_series(ratio < compression_threshold, vix.index)
    return _bool_series(ratio > expansion_threshold, vix.index)


def vx2_vix_5d_20d_stdev_ratio(
    vix: pd.DataFrame, threshold: float = 0.87
) -> pd.Series:
    """VX2: VIX 5-day return stdev / 20-day return stdev < threshold (compression).

    Berg context: "this is an extreme in VIX ... it's standard deviation of VIX.
    I don't give you the exact formula, but let's just say it's 5 days over 20
    days of VIX ... less than 0.87 means it's an extreme downward movement in
    VIX, which people are saying is bearish."

    Confidence: MEDIUM (0.60). Berg explicitly said "I don't give you the exact
    formula." We use return-stdev as the most natural interpretation; could also
    be level-stdev or absolute-stdev.
    """
    _require_columns(vix, ("close",))
    ret = vix["close"].pct_change()
    s5 = ret.rolling(5, min_periods=5).std()
    s20 = ret.rolling(20, min_periods=20).std()
    ratio = s5 / s20.replace(0, np.nan)
    return _bool_series(ratio < threshold, vix.index)


# ============================================================================
# Trend / regime atoms (T*)
# ============================================================================

def t1_below_250d_sma(prices: pd.DataFrame) -> pd.Series:
    """T1: close < 250-day simple moving average (downtrend filter).

    Berg context: "the S&P is below its 250-day moving average, meaning that
    it's been in a downtrend ... we're using this as a filter."

    Confidence: HIGH (0.99). Standard SMA.
    """
    _require_columns(prices, ("close",))
    sma250 = prices["close"].rolling(250, min_periods=250).mean()
    return _bool_series(prices["close"] < sma250, prices.index)


def t2_days_holding_low(prices: pd.DataFrame, lookback: int = 252) -> pd.Series:
    """T2: number of days the most recent swing low has held (without violation).

    Returns an INT Series (not bool) — combos consume the count and threshold it.

    Berg context: "we count days off a low. Market's down 10%. It held its low
    for 1 day, 2 days, 3 days, 4 days, 7 days. In this case, S&P down 5% and
    held its low for 7 days."

    Algorithm:
      1. The "swing low" is the trailing min close over `lookback` days.
      2. Count consecutive days that close > that swing low (the hold count).
      3. On the day the swing low IS the close, count resets to 0.

    Confidence: MEDIUM (0.70). Multiple valid definitions of "swing low" exist;
    we use rolling min as the most stable. Alternative: pivot-low detection.
    """
    _require_columns(prices, ("close",))
    close = prices["close"]
    low_lb = close.rolling(lookback, min_periods=10).min()
    # The day the low is touched: close == low_lb
    is_low_day = (close == low_lb)
    # Reset counter each time a new low is touched
    grp = is_low_day.cumsum()
    hold_days = (~is_low_day).groupby(grp).cumsum()
    return hold_days.fillna(0).astype(int).rename("days_holding_low")


def t2_held_low_for_at_least(prices: pd.DataFrame, n_days: int) -> pd.Series:
    """T2 as a boolean: low has held for at least n_days."""
    return _bool_series(t2_days_holding_low(prices) >= n_days, prices.index)


# ============================================================================
# Breadth atoms (B*) — partial; some require external data
# ============================================================================

def b6_new_n_day_low(prices: pd.DataFrame, n: int = 60) -> pd.Series:
    """B6: close is a new n-day low.

    Berg context: "we want the S&P to make at least a new 60-day low."

    Confidence: HIGH (0.95).
    """
    _require_columns(prices, ("close",))
    low_n = prices["close"].rolling(n, min_periods=n).min()
    return _bool_series(prices["close"] <= low_n, prices.index)


def b_ad_ratio_at_ath(
    prices: pd.DataFrame,
    ad_5d_ratio: pd.Series,
    threshold: float = 0.87,
    ath_lookback: int = 252,
    ath_proximity_days: int = 5,
) -> pd.Series:
    """B_AD_5D_087: NYSE 5-day A/D ratio ≤ threshold (default 0.87) while the
    index is within `ath_proximity_days` of its trailing all-time high.

    Source: Berg X — flags "breadth divergence at peak" — a topping precondition.

    Args:
        prices: OHLCV of the index (S&P or composite).
        ad_5d_ratio: pre-computed 5-day advancers/decliners ratio, indexed same
                     as `prices`. Caller supplies (Polygon NYSE breadth).
        threshold: A/D ratio ceiling for "weak breadth" (default 0.87).
        ath_lookback: trailing window defining the "all-time high" (252 = 1y).
        ath_proximity_days: how many days from ATH still counts as "at peak".

    Confidence: MEDIUM-HIGH (0.75). Direction and threshold quoted by Berg;
    "near ATH" framing requires interpretation (we use 5-day proximity).
    """
    _require_columns(prices, ("close",))
    ath = prices["close"].rolling(ath_lookback, min_periods=ath_lookback).max()
    at_or_near_ath = (
        (prices["close"] / ath - 1.0).rolling(ath_proximity_days, min_periods=1).max() >= -1e-9
    )
    weak_breadth = ad_5d_ratio <= threshold
    return _bool_series(at_or_near_ath & weak_breadth, prices.index)


def b_da_extreme_threshold(
    decliners_to_advancers: pd.Series, threshold: float = 50.0
) -> pd.Series:
    """B_DA_SP600_50_1: index daily decliners/advancers ratio ≥ threshold.

    Source: Berg X 2026 — "S&P 600 closed 50-to-1 declining/advancing — 18 cases
    since 1995, 100% higher 120 days later."

    Caller supplies the D/A ratio series (e.g. for SP600 mid-caps). Threshold
    is the most-cited 50.0 but lower thresholds (10:1, 20:1) also have history.

    Confidence: HIGH (0.80). Threshold + outcome verbatim; data source explicit.
    """
    return _bool_series(decliners_to_advancers >= threshold, decliners_to_advancers.index)


def t_rin_extreme(
    trin: pd.Series,
    panic_threshold: float = 12.5,
    rare_panic_threshold: float = 15.5,
    use_rare: bool = False,
) -> pd.Series:
    """T_RIN_HI: NYSE TRIN ≥ panic level (capitulation tape extreme).

    Source: Berg X / Forward Guidance — cites TRIN 15.50 on 2020-03-12 and
    TRIN 12.52 in Oct 2008 as historic panic prints.

    TRIN = (Advances/Declines) / (Up Volume / Down Volume).
    Caller supplies the TRIN series; this just thresholds it.

    Args:
        trin: TRIN intraday-close or daily-close series.
        panic_threshold: 12.5 (more frequent capitulation).
        rare_panic_threshold: 15.5 (generational extreme).
        use_rare: if True, use rare_panic_threshold; else panic_threshold.

    Confidence: HIGH (0.85). Threshold values quoted directly by Berg with
    historical anchor dates.
    """
    th = rare_panic_threshold if use_rare else panic_threshold
    return _bool_series(trin >= th, trin.index)


def b7_new_n_day_closing_high(prices: pd.DataFrame, n: int = 30) -> pd.Series:
    """B7: close is a new n-day closing high (recovery high or breakout).

    Berg context (C6): "S&P has closed at a recovery high, which is a 30-day new high."
    Berg context (C9): "the S&P 500 closed at a 2-year high on a closing basis."

    Default 30 covers recovery-high case. For 2-year high use n=504.

    Confidence: HIGH (0.95).
    """
    _require_columns(prices, ("close",))
    high_n = prices["close"].rolling(n, min_periods=n).max()
    return _bool_series(prices["close"] >= high_n, prices.index)


# ============================================================================
# Atom registry — single source of truth for combos / dashboard
# ============================================================================

BERG_ATOM_REGISTRY: dict[str, AtomSpec] = {
    "V1": AtomSpec(
        id="V1", name="5d Volume 200d High", category="volume",
        confidence=0.95, confidence_level="HIGH",
        confidence_rationale="Pure rolling-max calculation; Berg's verbal description is exact.",
        source_quote='"the NDX generated its highest 5-day volume in 200 days"',
        required_columns=("volume",),
        func=v1_5d_volume_at_200d_high,
    ),
    "V2": AtomSpec(
        id="V2", name="5d Avg Volume 375d High Within 20d", category="volume",
        confidence=0.90, confidence_level="HIGH",
        confidence_rationale="Two-stage rolling max; Berg specifies window and lookback.",
        source_quote='"5-day average NYSE volume was the highest in 375 days ... within the past 20 days"',
        required_columns=("volume",),
        func=v2_5d_avg_volume_375d_high_within_20d,
    ),
    "V5": AtomSpec(
        id="V5", name="Signal Day Volume Up 20%", category="volume",
        confidence=0.95, confidence_level="HIGH",
        confidence_rationale="Direct verbal threshold.",
        source_quote='"the S&P was up on the day and volume increased by 20%"',
        required_columns=("close", "volume"),
        func=v5_signal_day_volume_up_20pct,
    ),
    "P1": AtomSpec(
        id="P1", name="3d Decline ≥7%", category="momentum",
        confidence=0.95, confidence_level="HIGH",
        confidence_rationale="Direct pct_change with explicit threshold.",
        source_quote='"the Nasdaq declined 7% in 3 days"',
        required_columns=("close",),
        func=p1_3d_decline_at_least_7pct,
    ),
    "P2": AtomSpec(
        id="P2", name="1d Gain is 250d Max", category="momentum",
        confidence=0.95, confidence_level="HIGH",
        confidence_rationale="Rolling-max rank, no ambiguity.",
        source_quote='"S&P had its highest one-day gain in 250 days"',
        required_columns=("close",),
        func=p2_1d_gain_is_250d_max,
    ),
    "P3": AtomSpec(
        id="P3", name="10d Gain is 180d Max", category="momentum",
        confidence=0.95, confidence_level="HIGH",
        confidence_rationale="Direct rolling-max rank.",
        source_quote='"Nasdaq generated its greatest 10-day gain in 180 trading days"',
        required_columns=("close",),
        func=p3_10d_gain_is_180d_max,
    ),
    "P4": AtomSpec(
        id="P4", name="10d ROC is 180d Max (companion)", category="momentum",
        confidence=0.65, confidence_level="MEDIUM",
        confidence_rationale="Berg cited as distinct from P3 in same combo; possibly different index.",
        source_quote='"S&P 500 also generated its greatest 10-day change high rate of change in 180 trading days"',
        required_columns=("close",),
        func=p4_10d_roc_is_180d_max,
    ),
    "P5": AtomSpec(
        id="P5", name="10d ROC ≥20% (SOX)", category="momentum",
        confidence=0.95, confidence_level="HIGH",
        confidence_rationale="Direct threshold on pct_change(10).",
        source_quote='"SOX 10-day rate of change was above 20%"',
        required_columns=("close",),
        func=p5_10d_roc_at_least_20pct,
    ),
    "P6": AtomSpec(
        id="P6", name="Drawdown from 252d High", category="momentum",
        confidence=0.90, confidence_level="HIGH",
        confidence_rationale="Standard drawdown; Berg gave multiple thresholds across combos.",
        source_quote='"S&P declined 9% and held this low" / "down 5%" / "down 13%"',
        required_columns=("close",),
        func=p6_drawdown_from_252d_high,
    ),
    "P6_HIST": AtomSpec(
        id="P6_HIST", name="Drawdown ≤threshold reached within lookback (historical)",
        category="momentum",
        confidence=0.90, confidence_level="HIGH",
        confidence_rationale="Historical-anywhere variant of P6; matches Berg's 'declined X%' verbal usage.",
        source_quote='"the market declined 10%, held the low for 19 days" — drawdown is regime, not same-bar.',
        required_columns=("close",),
        func=p6_drawdown_reached_within,
    ),
    "P7": AtomSpec(
        id="P7", name="Recovery Thrust into New High", category="momentum",
        confidence=0.70, confidence_level="MEDIUM",
        confidence_rationale="Multi-part reconstruction; Berg's 'from last 10% decline' is interpretable.",
        source_quote='"S&P gained 6% from its last 10% decline into a new recovery high"',
        required_columns=("close",),
        func=p7_recovery_thrust,
    ),
    "P8": AtomSpec(
        id="P8", name="Single-Day Gain ≥9%", category="momentum",
        confidence=0.98, confidence_level="HIGH",
        confidence_rationale="Trivial pct_change threshold.",
        source_quote='"both the Nasdaq and the S&P gained more than 9% on the day"',
        required_columns=("close",),
        func=p8_single_day_gain_at_least,
    ),
    "P_ROC74": AtomSpec(
        id="P_ROC74", name="5d ROC ≥7.4% (bull-market trigger)", category="momentum",
        confidence=0.90, confidence_level="HIGH",
        confidence_rationale="Threshold (7.4%) and outcome (bull-market initiation since 1928) quoted verbatim; pure pct_change(5).",
        source_quote='"Every bull market since 1928 has begun with a 5-day move of at least 7.4% on the S&P 500" (Forward Guidance Feb 2024)',
        required_columns=("close",),
        func=p_roc74_5d_bull_trigger,
    ),
    "S1": AtomSpec(
        id="S1", name="≥10 Consecutive Up Days (VIX)", category="streak",
        confidence=0.98, confidence_level="HIGH",
        confidence_rationale="Pure consecutive-day counter.",
        source_quote='"to be up 10 days in a row was 0.01% of the time"',
        required_columns=("close",),
        func=s1_consecutive_up_days,
    ),
    "S2": AtomSpec(
        id="S2", name="≥10 Consecutive Down Days (VIX)", category="streak",
        confidence=0.98, confidence_level="HIGH",
        confidence_rationale="Pure consecutive-day counter.",
        source_quote='"to be down 10 days in a row happened 0.04% of the time"',
        required_columns=("close",),
        func=s2_consecutive_down_days,
    ),
    "S5": AtomSpec(
        id="S5", name="Up 8 of 9 Days (Russell)", category="streak",
        confidence=0.95, confidence_level="HIGH",
        confidence_rationale="Direct rolling sum.",
        source_quote='"the Russell was up 8 out of 9 days"',
        required_columns=("close",),
        func=s5_up_n_of_m_days,
    ),
    "S6": AtomSpec(
        id="S6", name="Up 16 of 19 Days (Nasdaq)", category="streak",
        confidence=0.95, confidence_level="HIGH",
        confidence_rationale="Specific case of n-of-m up days.",
        source_quote='"Nasdaq was up 16 out of those 19 days"',
        required_columns=("close",),
        func=s6_up_16_of_19_days,
    ),
    "VX1": AtomSpec(
        id="VX1", name="VIX Intraday Range 45-60", category="volatility",
        confidence=0.70, confidence_level="MEDIUM",
        confidence_rationale="Berg said 'traded between 45 and 60'; could mean close, range, or intraday H/L.",
        source_quote='"the VIX traded between 45 and 60"',
        required_columns=("high", "low"),
        func=vx1_vix_in_range,
    ),
    "VX2": AtomSpec(
        id="VX2", name="VIX 5d/20d Stdev Ratio <0.87", category="volatility",
        confidence=0.60, confidence_level="MEDIUM",
        confidence_rationale="Berg explicitly: 'I don't give you the exact formula' — return-stdev is our reading.",
        source_quote='"5 days over 20 days of VIX ... less than 0.87 means an extreme downward movement"',
        required_columns=("close",),
        func=vx2_vix_5d_20d_stdev_ratio,
    ),
    "VX_VIX_5_30": AtomSpec(
        id="VX_VIX_5_30", name="VIX 5d/30d Mean Ratio (compression/expansion)", category="volatility",
        confidence=0.70, confidence_level="MEDIUM",
        confidence_rationale="Direction and rough thresholds match Berg's X posts; exact cutoffs reconstructed.",
        source_quote='Berg X posts 2025-2026 referencing "VIX 5/30 ratio" extremes',
        required_columns=("close",),
        func=vx_vix_5d_30d_avg_ratio,
    ),
    "T1": AtomSpec(
        id="T1", name="Close <250d SMA (downtrend filter)", category="trend",
        confidence=0.99, confidence_level="HIGH",
        confidence_rationale="Standard SMA filter; Berg specifies as filter only.",
        source_quote='"S&P is below its 250-day moving average, meaning that it\'s been in a downtrend"',
        required_columns=("close",),
        func=t1_below_250d_sma,
    ),
    "T2": AtomSpec(
        id="T2", name="Days Holding Low (count)", category="trend",
        confidence=0.70, confidence_level="MEDIUM",
        confidence_rationale="Multiple valid 'swing low' definitions; rolling-min chosen for stability.",
        source_quote='"we count days off a low ... held its low for 1, 2, 3, 4, 7, 9, 19 days"',
        required_columns=("close",),
        func=t2_days_holding_low,
    ),
    "B6": AtomSpec(
        id="B6", name="New N-Day Low (default 60)", category="breadth",
        confidence=0.95, confidence_level="HIGH",
        confidence_rationale="Rolling min; Berg specifies 60d.",
        source_quote='"the S&P to make at least a new 60-day low"',
        required_columns=("close",),
        func=b6_new_n_day_low,
    ),
    "B7": AtomSpec(
        id="B7", name="New N-Day Closing High", category="breadth",
        confidence=0.95, confidence_level="HIGH",
        confidence_rationale="Rolling max; Berg uses 30d (recovery) and 504d (2-year breakout).",
        source_quote='"S&P has closed at a recovery high, which is a 30-day new high" / "2-year high on a closing basis"',
        required_columns=("close",),
        func=b7_new_n_day_closing_high,
    ),
    "B_AD_5D_087": AtomSpec(
        id="B_AD_5D_087", name="NYSE 5d A/D ≤0.87 at ATH (breadth divergence at peak)",
        category="breadth",
        confidence=0.75, confidence_level="MEDIUM",
        confidence_rationale="Threshold (0.87) and direction quoted; 'at ATH' framing requires 5d-proximity interpretation.",
        source_quote='Berg X — breadth-divergence-at-peak precondition for tops',
        required_columns=("close", "__external_ad_5d_ratio__"),
        func=b_ad_ratio_at_ath,
    ),
    "B_DA_SP600_50_1": AtomSpec(
        id="B_DA_SP600_50_1", name="S&P 600 50-to-1 Decliners/Advancers", category="breadth",
        confidence=0.85, confidence_level="HIGH",
        confidence_rationale="Threshold (50:1) and outcome (18 cases since 1995, 100% up 120d) quoted verbatim.",
        source_quote='"S&P 600 closed 50-to-1 declining/advancing — 18 cases since 1995, 100% higher 120 days later"',
        required_columns=("__external_da_ratio__",),
        func=b_da_extreme_threshold,
    ),
    "T_RIN_HI": AtomSpec(
        id="T_RIN_HI", name="NYSE TRIN ≥12.5 (panic) or ≥15.5 (rare panic)", category="breadth",
        confidence=0.85, confidence_level="HIGH",
        confidence_rationale="Thresholds (12.5 / 15.5) anchored to historic dates (2020-03-12: 15.50, Oct 2008: 12.52).",
        source_quote='Berg cites TRIN 15.50 (Mar 2020) and 12.52 (Oct 2008) as generational panic prints',
        required_columns=("__external_trin__",),
        func=t_rin_extreme,
    ),
}


# ============================================================================
# Public helpers
# ============================================================================

def list_atoms(min_confidence: float = 0.0) -> list[AtomSpec]:
    """Return all AtomSpecs with confidence >= min_confidence, sorted by ID."""
    return sorted(
        (s for s in BERG_ATOM_REGISTRY.values() if s.confidence >= min_confidence),
        key=lambda s: s.id,
    )


def evaluate_all(
    instrument_panel: Mapping[str, pd.DataFrame],
    atom_ids: list[str] | None = None,
) -> pd.DataFrame:
    """Evaluate atoms across an instrument panel.

    Args:
        instrument_panel: dict mapping instrument symbol (e.g., 'SPY', 'QQQ', 'VIX')
                          to OHLCV DataFrame.
        atom_ids: optional subset; default = all atoms.

    Returns:
        long-form DataFrame with columns:
          date, atom_id, instrument, fired (bool)
    """
    if atom_ids is None:
        atom_ids = list(BERG_ATOM_REGISTRY.keys())

    rows = []
    for atom_id in atom_ids:
        spec = BERG_ATOM_REGISTRY[atom_id]
        for instrument, df in instrument_panel.items():
            try:
                if not all(c in df.columns for c in spec.required_columns):
                    continue
                result = spec.func(df)
                fired_dates = result[result].index
                for d in fired_dates:
                    rows.append({
                        "date": d, "atom_id": atom_id,
                        "instrument": instrument, "fired": True,
                    })
            except Exception as e:
                # In production, route through trackError
                print(f"[berg_atoms] atom {atom_id} on {instrument} failed: {e}")
                continue
    return pd.DataFrame(rows)