/home/agent-jay/claudeCode/jarvis/finance/indicators/berg_atoms.py
berg_atoms.py — atomic indicator source
"""Milton Berg atomic indicators — histogram-tail extreme detectors.
Source: Milton Berg interview on Crowded Market Report (Jason Shapiro),
YouTube 68bnJkSZCzk, recorded 2026-05-18.
Methodology (Berg, paraphrased):
- Daily market behavior is random in the middle of the distribution; ignore it.
- Signals emerge only at extreme tails of an indicator's historical histogram.
- A tradeable signal = 3-4 atoms firing on the same bar (a "combo").
- Berg's full library: ~30,000 raw indicators, ~2,000 tradeable combos.
- This module implements the ~17 atoms named explicitly in the May 2026 interview.
Each atom returns a boolean pd.Series aligned to the input DataFrame's index.
True = the extreme condition is satisfied on that bar.
CONFIDENCE LEVELS (see AtomSpec.confidence per atom)
HIGH (0.85-1.00) — formula unambiguous in source; pure math; reproducible.
MEDIUM (0.50-0.84) — formula reconstructed from Berg's verbal description.
LOW (0.30-0.49) — threshold inferred; needs validation against Berg's cited dates.
SPECULATIVE (<0.30) — substitute for proprietary Berg data (NDR, Market Vane).
Data contract:
Input: pd.DataFrame indexed by DatetimeIndex (daily), lowercase OHLCV columns.
Required columns vary by atom — see AtomSpec.required_columns.
Output: pd.Series[bool] aligned to input.index. NaN inputs propagate as False.
Numerical stability:
- Insufficient-history rows return False (not NaN, not exception).
- Division-by-zero guarded; missing-data tolerant.
- Vectorized via pandas/numpy — no Python loops over time.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Callable, Literal, Mapping
import numpy as np
import pandas as pd
# ============================================================================
# Registry types
# ============================================================================
ConfidenceLevel = Literal["HIGH", "MEDIUM", "LOW", "SPECULATIVE"]
Category = Literal["volume", "momentum", "streak", "breadth", "volatility", "sentiment", "trend"]
@dataclass(frozen=True)
class AtomSpec:
"""Structured metadata for one Berg atom."""
id: str # canonical ID (V1, P3, S5, ...)
name: str # short human name
category: Category
confidence: float # 0.0 - 1.0
confidence_level: ConfidenceLevel
confidence_rationale: str # why we have this confidence
source_quote: str # verbatim from Berg interview
required_columns: tuple[str, ...] # which OHLCV cols the func needs
func: Callable[..., pd.Series] # the detector
# ============================================================================
# Utilities (private)
# ============================================================================
def _require_columns(df: pd.DataFrame, cols: tuple[str, ...]) -> None:
"""Raise ValueError if any column is missing from df."""
missing = [c for c in cols if c not in df.columns]
if missing:
raise ValueError(f"DataFrame missing required columns: {missing}")
def _bool_series(values: pd.Series | np.ndarray, index: pd.Index) -> pd.Series:
"""Return a clean boolean Series with no NaN (NaN→False)."""
s = pd.Series(values, index=index)
return s.fillna(False).astype(bool)
def _run_length(condition: pd.Series) -> pd.Series:
"""Length of the current True-run ending at each row.
Example: [F,T,T,F,T,T,T] -> [0,1,2,0,1,2,3]
"""
grp = (condition != condition.shift()).cumsum()
return condition.groupby(grp).cumsum().astype(int)
# ============================================================================
# Volume atoms (V*)
# ============================================================================
def v1_5d_volume_at_200d_high(prices: pd.DataFrame) -> pd.Series:
"""V1: 5-day rolling volume is at its trailing 200-day maximum.
Berg context: "the NDX generated its highest 5-day volume in 200 days."
Used at panic lows — extreme volume = capitulation OR thrust.
Confidence: HIGH (0.95). Pure rolling-max calculation, formula unambiguous.
"""
_require_columns(prices, ("volume",))
v5 = prices["volume"].rolling(5, min_periods=5).sum()
v5_max200 = v5.rolling(200, min_periods=200).max()
return _bool_series(v5 == v5_max200, prices.index)
def v2_5d_avg_volume_375d_high_within_20d(prices: pd.DataFrame) -> pd.Series:
"""V2: 5-day average volume hit a 375-day high anytime in the past 20 days.
Berg context: "5-day average New York Stock Exchange volume was the highest
in 375 days ... we consider that a high volume regime, which occurs at a
turning point. We're not necessarily looking for high volume on the day
of the low ... as long as there's high 5-day volume greatest in 1.5 years
within the past 20 days, we consider that a high volume regime."
Confidence: HIGH (0.90). Two-stage rolling-max; Berg's description is precise.
"""
_require_columns(prices, ("volume",))
v5avg = prices["volume"].rolling(5, min_periods=5).mean()
v5avg_max375 = v5avg.rolling(375, min_periods=375).max()
is_375d_high = (v5avg == v5avg_max375)
# "Within past 20 days" = at least one True in trailing 20-row window
within_20 = is_375d_high.rolling(20, min_periods=1).max().astype(bool)
return _bool_series(within_20, prices.index)
def v5_signal_day_volume_up_20pct(prices: pd.DataFrame) -> pd.Series:
"""V5: Up day with volume ≥ +20% vs prior day.
Berg context: "S&P was up on the day and volume increased by 20%."
Confidence: HIGH (0.95). Direct verbal threshold.
"""
_require_columns(prices, ("close", "volume"))
up_day = prices["close"] > prices["close"].shift(1)
vol_up_20 = prices["volume"] >= 1.20 * prices["volume"].shift(1)
return _bool_series(up_day & vol_up_20, prices.index)
# ============================================================================
# Price-momentum atoms (P*)
# ============================================================================
def p1_3d_decline_at_least_7pct(prices: pd.DataFrame, threshold: float = -0.07) -> pd.Series:
"""P1: 3-day rate-of-change ≤ -7% (panic decline).
Berg context: "the Nasdaq declined 7% in 3 days ... at the tail end of a
bear market ... you start seeing excessive momentum ... negative momentum
into a low is a positive."
Confidence: HIGH (0.95). Simple pct_change with explicit threshold.
"""
_require_columns(prices, ("close",))
roc3 = prices["close"].pct_change(3)
return _bool_series(roc3 <= threshold, prices.index)
def p2_1d_gain_is_250d_max(prices: pd.DataFrame) -> pd.Series:
"""P2: today's 1-day pct change ranks #1 in the trailing 250 trading days.
Berg context: "the S&P had its highest one-day gain in 250 days,
a little over a year of trading."
Confidence: HIGH (0.95). Rolling-max rank on pct_change.
"""
_require_columns(prices, ("close",))
chg1 = prices["close"].pct_change()
return _bool_series(chg1 == chg1.rolling(250, min_periods=250).max(), prices.index)
def p3_10d_gain_is_180d_max(prices: pd.DataFrame) -> pd.Series:
"""P3: 10-day pct change ranks #1 in the trailing 180 trading days.
Berg context: "Nasdaq generated its greatest 10-day gain in 180 trading days."
Confidence: HIGH (0.95).
"""
_require_columns(prices, ("close",))
chg10 = prices["close"].pct_change(10)
return _bool_series(chg10 == chg10.rolling(180, min_periods=180).max(), prices.index)
def p4_10d_roc_is_180d_max(prices: pd.DataFrame) -> pd.Series:
"""P4: 10-day rate-of-change is at its 180-day high.
Berg gave this as a separate condition from P3 in combo C6 ("greatest
10-day gain in 180 days" PLUS "greatest 10-day rate-of-change in 180 days").
Berg's distinction is unclear — possibly two indices (Nasdaq vs S&P).
Confidence: MEDIUM (0.65). Same math as P3 but Berg cited as distinct atom;
treat as P3 applied to a second instrument (caller supplies which).
"""
return p3_10d_gain_is_180d_max(prices)
def p5_10d_roc_at_least_20pct(prices: pd.DataFrame, threshold: float = 0.20) -> pd.Series:
"""P5: 10-day rate-of-change ≥ +20% (typically SOX).
Berg context: "the SOX 10-day rate of change was above 20%."
Confidence: HIGH (0.95).
"""
_require_columns(prices, ("close",))
roc10 = prices["close"].pct_change(10)
return _bool_series(roc10 >= threshold, prices.index)
def p6_drawdown_from_252d_high(
prices: pd.DataFrame, threshold: float = -0.10
) -> pd.Series:
"""P6: max drawdown from trailing 252-day high ≤ threshold (default -10%).
Berg's variants: -5%, -9%, -10%, -12%, -13%. Parameterized so combos can
use any depth.
Confidence: HIGH (0.90). Standard drawdown computation; Berg gave multiple
thresholds across different combos.
"""
_require_columns(prices, ("close",))
high_252 = prices["close"].rolling(252, min_periods=20).max()
dd = prices["close"] / high_252 - 1.0
return _bool_series(dd <= threshold, prices.index)
def p6_drawdown_reached_within(
prices: pd.DataFrame,
threshold: float = -0.10,
lookback_days: int = 60,
) -> pd.Series:
"""P6 (historical variant): drawdown from 252d high REACHED ≤threshold at
any point during the trailing ``lookback_days`` window.
Distinct from ``p6_drawdown_from_252d_high`` (current-bar reading): some
Berg combos (C5, C6, C8, C10) describe "the market DECLINED X%, held the
low for Y days, then thrusted" — the drawdown is a historical regime
condition, not a same-bar condition. By the time the thrust/recovery
fires, current drawdown has shrunk.
Args:
prices: OHLCV with 'close' column.
threshold: drawdown depth (default -0.10). Use -0.05, -0.13 etc. per combo.
lookback_days: trailing TRADING-day window in which the DD must have occurred.
Confidence: HIGH (0.90). The historical-anywhere semantics match Berg's
verbal usage in C5/C6/C10 ("the market declined ...").
"""
_require_columns(prices, ("close",))
high_252 = prices["close"].rolling(252, min_periods=20).max()
dd = prices["close"] / high_252 - 1.0
reached = (dd <= threshold).rolling(lookback_days, min_periods=1).max().astype(bool)
return _bool_series(reached, prices.index)
def p7_recovery_thrust(
prices: pd.DataFrame,
prior_drop_pct: float = -0.10,
recovery_pct: float = 0.06,
lookback: int = 60,
) -> pd.Series:
"""P7: market gained ≥ recovery_pct from its latest swing low (after a
prior_drop_pct decline) INTO a new recovery high (highest close since the low).
Berg context: "the S&P gained 6% from its last 10% decline into a new
recovery high."
Decomposition (per bar):
1. Identify the lowest close in the last `lookback` days → `swing_low`.
2. Confirm `swing_low` followed a drop of ≥ |prior_drop_pct| from its
own trailing `lookback`-day high.
3. Today's close is the highest close since `swing_low` (new recovery high).
4. Today's close ≥ (1 + recovery_pct) × `swing_low`.
Confidence: MEDIUM (0.70). Formula is reconstructed; Berg's "from its last
10% decline" wording is interpretable.
Implementation: per-bar loop, O(n * lookback). Acceptable for daily data
(~25k bars × 60 = 1.5M ops). Vectorize later if it becomes a hotspot.
"""
_require_columns(prices, ("close",))
close = prices["close"].to_numpy()
n = len(close)
out = np.zeros(n, dtype=bool)
min_history = 10 # need at least this many bars to identify a swing low
for i in range(min_history, n):
start = max(0, i - lookback + 1)
window = close[start: i + 1]
low_pos = int(np.argmin(window)) # index within window
low = float(window[low_pos])
# Closes BEFORE the low, in the same window:
before_low = window[:low_pos + 1]
high_before = float(before_low.max())
if high_before <= 0:
continue
# Step 2: drawdown depth check
if (low / high_before - 1.0) > prior_drop_pct:
continue
# Step 3: today is the highest close since the low (inclusive of low day)
since_low = window[low_pos:]
if close[i] < since_low.max() - 1e-12:
continue
# Step 4: recovery magnitude
if close[i] < low * (1.0 + recovery_pct):
continue
out[i] = True
return pd.Series(out, index=prices.index)
def p8_single_day_gain_at_least(
prices: pd.DataFrame, threshold: float = 0.09
) -> pd.Series:
"""P8: single-day gain ≥ +9% (capitulation reversal / thrust day).
Berg context: "both the Nasdaq and the S&P gained more than 9% on the day."
Confidence: HIGH (0.98).
"""
_require_columns(prices, ("close",))
chg = prices["close"].pct_change()
return _bool_series(chg >= threshold, prices.index)
def p_roc74_5d_bull_trigger(
prices: pd.DataFrame, threshold: float = 0.074
) -> pd.Series:
"""P_ROC74: 5-day rate-of-change ≥ +7.4% (Berg's "every bull since 1928" trigger).
Source: Berg on Forward Guidance (Feb 2024) — "Every bull market since 1928
has begun with a 5-day move of at least 7.4% on the S&P 500."
Confidence: HIGH (0.90). Threshold is verbatim; ROC math unambiguous.
Note: this is a necessary-not-sufficient condition — many such moves occur
inside bear-market rallies. Use as filter, not standalone signal.
"""
_require_columns(prices, ("close",))
roc5 = prices["close"].pct_change(5)
return _bool_series(roc5 >= threshold, prices.index)
# ============================================================================
# Streak atoms (S*)
# ============================================================================
def s1_consecutive_up_days(prices: pd.DataFrame, n: int = 10) -> pd.Series:
"""S1/S3: ≥ n consecutive up closes (default 10).
Berg context (VIX): "to be up 10 days in a row was 0.01% of the time"
Berg context (S&P): "you've seen 9 up days in a row, 11 up days in a row."
Confidence: HIGH (0.98).
"""
_require_columns(prices, ("close",))
up = prices["close"] > prices["close"].shift(1)
return _bool_series(_run_length(up) >= n, prices.index)
def s2_consecutive_down_days(prices: pd.DataFrame, n: int = 10) -> pd.Series:
"""S2/S4: ≥ n consecutive down closes (default 10).
Berg context: "to be down 10 days in a row happened 0.04% of the time."
Confidence: HIGH (0.98).
"""
_require_columns(prices, ("close",))
down = prices["close"] < prices["close"].shift(1)
return _bool_series(_run_length(down) >= n, prices.index)
def s5_up_n_of_m_days(
prices: pd.DataFrame, n: int = 8, m: int = 9
) -> pd.Series:
"""S5: at least n up days within the last m days (default 8 of 9).
Berg context: "the Russell was up 8 out of 9 days."
Confidence: HIGH (0.95).
"""
_require_columns(prices, ("close",))
up = (prices["close"] > prices["close"].shift(1)).astype(int)
up_count = up.rolling(m, min_periods=m).sum()
return _bool_series(up_count >= n, prices.index)
def s6_up_16_of_19_days(prices: pd.DataFrame) -> pd.Series:
"""S6: at least 16 up days in the last 19 (special case for combo C10).
Berg context: "Nasdaq declined 10%, held its low for 19 days, and the
Nasdaq was up 16 out of those 19 days."
Confidence: HIGH (0.95). Convenience wrapper around s5_up_n_of_m_days.
"""
return s5_up_n_of_m_days(prices, n=16, m=19)
# ============================================================================
# Volatility atoms (VX*)
# ============================================================================
def vx1_vix_in_range(
vix: pd.DataFrame, low_bound: float = 45.0, high_bound: float = 60.0
) -> pd.Series:
"""VX1: VIX traded inside [low_bound, high_bound] on the day.
Berg context: "the VIX traded between 45 and 60. Anyone who knows history
of the VIX, that's a very high spread."
Two interpretations:
(a) intraday range straddled the band (low <= high_bound AND high >= low_bound)
(b) close was inside the band
We implement (a) — the looser inclusive intra-day reading per Berg's
description.
Confidence: MEDIUM (0.70). Berg's exact criterion ("between 45 and 60")
could mean closing range, intraday range, or specific high-low. We pick
intraday range as the most literal reading.
"""
_require_columns(vix, ("high", "low"))
inside = (vix["low"] <= high_bound) & (vix["high"] >= low_bound)
return _bool_series(inside, vix.index)
def vx_vix_5d_30d_avg_ratio(
vix: pd.DataFrame,
compression_threshold: float = 0.85,
expansion_threshold: float = 1.50,
mode: Literal["compression", "expansion"] = "compression",
) -> pd.Series:
"""VX_VIX_5_30: VIX 5-day mean / 30-day mean — compression or expansion extreme.
Source: Berg X posts 2025-2026 — uses this as a regime-extreme reading
distinct from VX2 (which is stdev-based, not mean-based).
mode='compression' → ratio < compression_threshold (0.85 default) — extreme
VIX collapse; sometimes precedes tops, sometimes precedes continuation.
mode='expansion' → ratio > expansion_threshold (1.50 default) — VIX surging;
capitulation regime, often near bottoms.
Confidence: MEDIUM (0.70). Direction and rough thresholds match Berg's
cited language; exact cutoffs reconstructed from screenshotted charts.
"""
_require_columns(vix, ("close",))
mean5 = vix["close"].rolling(5, min_periods=5).mean()
mean30 = vix["close"].rolling(30, min_periods=30).mean()
ratio = mean5 / mean30.replace(0, np.nan)
if mode == "compression":
return _bool_series(ratio < compression_threshold, vix.index)
return _bool_series(ratio > expansion_threshold, vix.index)
def vx2_vix_5d_20d_stdev_ratio(
vix: pd.DataFrame, threshold: float = 0.87
) -> pd.Series:
"""VX2: VIX 5-day return stdev / 20-day return stdev < threshold (compression).
Berg context: "this is an extreme in VIX ... it's standard deviation of VIX.
I don't give you the exact formula, but let's just say it's 5 days over 20
days of VIX ... less than 0.87 means it's an extreme downward movement in
VIX, which people are saying is bearish."
Confidence: MEDIUM (0.60). Berg explicitly said "I don't give you the exact
formula." We use return-stdev as the most natural interpretation; could also
be level-stdev or absolute-stdev.
"""
_require_columns(vix, ("close",))
ret = vix["close"].pct_change()
s5 = ret.rolling(5, min_periods=5).std()
s20 = ret.rolling(20, min_periods=20).std()
ratio = s5 / s20.replace(0, np.nan)
return _bool_series(ratio < threshold, vix.index)
# ============================================================================
# Trend / regime atoms (T*)
# ============================================================================
def t1_below_250d_sma(prices: pd.DataFrame) -> pd.Series:
"""T1: close < 250-day simple moving average (downtrend filter).
Berg context: "the S&P is below its 250-day moving average, meaning that
it's been in a downtrend ... we're using this as a filter."
Confidence: HIGH (0.99). Standard SMA.
"""
_require_columns(prices, ("close",))
sma250 = prices["close"].rolling(250, min_periods=250).mean()
return _bool_series(prices["close"] < sma250, prices.index)
def t2_days_holding_low(prices: pd.DataFrame, lookback: int = 252) -> pd.Series:
"""T2: number of days the most recent swing low has held (without violation).
Returns an INT Series (not bool) — combos consume the count and threshold it.
Berg context: "we count days off a low. Market's down 10%. It held its low
for 1 day, 2 days, 3 days, 4 days, 7 days. In this case, S&P down 5% and
held its low for 7 days."
Algorithm:
1. The "swing low" is the trailing min close over `lookback` days.
2. Count consecutive days that close > that swing low (the hold count).
3. On the day the swing low IS the close, count resets to 0.
Confidence: MEDIUM (0.70). Multiple valid definitions of "swing low" exist;
we use rolling min as the most stable. Alternative: pivot-low detection.
"""
_require_columns(prices, ("close",))
close = prices["close"]
low_lb = close.rolling(lookback, min_periods=10).min()
# The day the low is touched: close == low_lb
is_low_day = (close == low_lb)
# Reset counter each time a new low is touched
grp = is_low_day.cumsum()
hold_days = (~is_low_day).groupby(grp).cumsum()
return hold_days.fillna(0).astype(int).rename("days_holding_low")
def t2_held_low_for_at_least(prices: pd.DataFrame, n_days: int) -> pd.Series:
"""T2 as a boolean: low has held for at least n_days."""
return _bool_series(t2_days_holding_low(prices) >= n_days, prices.index)
# ============================================================================
# Breadth atoms (B*) — partial; some require external data
# ============================================================================
def b6_new_n_day_low(prices: pd.DataFrame, n: int = 60) -> pd.Series:
"""B6: close is a new n-day low.
Berg context: "we want the S&P to make at least a new 60-day low."
Confidence: HIGH (0.95).
"""
_require_columns(prices, ("close",))
low_n = prices["close"].rolling(n, min_periods=n).min()
return _bool_series(prices["close"] <= low_n, prices.index)
def b_ad_ratio_at_ath(
prices: pd.DataFrame,
ad_5d_ratio: pd.Series,
threshold: float = 0.87,
ath_lookback: int = 252,
ath_proximity_days: int = 5,
) -> pd.Series:
"""B_AD_5D_087: NYSE 5-day A/D ratio ≤ threshold (default 0.87) while the
index is within `ath_proximity_days` of its trailing all-time high.
Source: Berg X — flags "breadth divergence at peak" — a topping precondition.
Args:
prices: OHLCV of the index (S&P or composite).
ad_5d_ratio: pre-computed 5-day advancers/decliners ratio, indexed same
as `prices`. Caller supplies (Polygon NYSE breadth).
threshold: A/D ratio ceiling for "weak breadth" (default 0.87).
ath_lookback: trailing window defining the "all-time high" (252 = 1y).
ath_proximity_days: how many days from ATH still counts as "at peak".
Confidence: MEDIUM-HIGH (0.75). Direction and threshold quoted by Berg;
"near ATH" framing requires interpretation (we use 5-day proximity).
"""
_require_columns(prices, ("close",))
ath = prices["close"].rolling(ath_lookback, min_periods=ath_lookback).max()
at_or_near_ath = (
(prices["close"] / ath - 1.0).rolling(ath_proximity_days, min_periods=1).max() >= -1e-9
)
weak_breadth = ad_5d_ratio <= threshold
return _bool_series(at_or_near_ath & weak_breadth, prices.index)
def b_da_extreme_threshold(
decliners_to_advancers: pd.Series, threshold: float = 50.0
) -> pd.Series:
"""B_DA_SP600_50_1: index daily decliners/advancers ratio ≥ threshold.
Source: Berg X 2026 — "S&P 600 closed 50-to-1 declining/advancing — 18 cases
since 1995, 100% higher 120 days later."
Caller supplies the D/A ratio series (e.g. for SP600 mid-caps). Threshold
is the most-cited 50.0 but lower thresholds (10:1, 20:1) also have history.
Confidence: HIGH (0.80). Threshold + outcome verbatim; data source explicit.
"""
return _bool_series(decliners_to_advancers >= threshold, decliners_to_advancers.index)
def t_rin_extreme(
trin: pd.Series,
panic_threshold: float = 12.5,
rare_panic_threshold: float = 15.5,
use_rare: bool = False,
) -> pd.Series:
"""T_RIN_HI: NYSE TRIN ≥ panic level (capitulation tape extreme).
Source: Berg X / Forward Guidance — cites TRIN 15.50 on 2020-03-12 and
TRIN 12.52 in Oct 2008 as historic panic prints.
TRIN = (Advances/Declines) / (Up Volume / Down Volume).
Caller supplies the TRIN series; this just thresholds it.
Args:
trin: TRIN intraday-close or daily-close series.
panic_threshold: 12.5 (more frequent capitulation).
rare_panic_threshold: 15.5 (generational extreme).
use_rare: if True, use rare_panic_threshold; else panic_threshold.
Confidence: HIGH (0.85). Threshold values quoted directly by Berg with
historical anchor dates.
"""
th = rare_panic_threshold if use_rare else panic_threshold
return _bool_series(trin >= th, trin.index)
def b7_new_n_day_closing_high(prices: pd.DataFrame, n: int = 30) -> pd.Series:
"""B7: close is a new n-day closing high (recovery high or breakout).
Berg context (C6): "S&P has closed at a recovery high, which is a 30-day new high."
Berg context (C9): "the S&P 500 closed at a 2-year high on a closing basis."
Default 30 covers recovery-high case. For 2-year high use n=504.
Confidence: HIGH (0.95).
"""
_require_columns(prices, ("close",))
high_n = prices["close"].rolling(n, min_periods=n).max()
return _bool_series(prices["close"] >= high_n, prices.index)
# ============================================================================
# Atom registry — single source of truth for combos / dashboard
# ============================================================================
BERG_ATOM_REGISTRY: dict[str, AtomSpec] = {
"V1": AtomSpec(
id="V1", name="5d Volume 200d High", category="volume",
confidence=0.95, confidence_level="HIGH",
confidence_rationale="Pure rolling-max calculation; Berg's verbal description is exact.",
source_quote='"the NDX generated its highest 5-day volume in 200 days"',
required_columns=("volume",),
func=v1_5d_volume_at_200d_high,
),
"V2": AtomSpec(
id="V2", name="5d Avg Volume 375d High Within 20d", category="volume",
confidence=0.90, confidence_level="HIGH",
confidence_rationale="Two-stage rolling max; Berg specifies window and lookback.",
source_quote='"5-day average NYSE volume was the highest in 375 days ... within the past 20 days"',
required_columns=("volume",),
func=v2_5d_avg_volume_375d_high_within_20d,
),
"V5": AtomSpec(
id="V5", name="Signal Day Volume Up 20%", category="volume",
confidence=0.95, confidence_level="HIGH",
confidence_rationale="Direct verbal threshold.",
source_quote='"the S&P was up on the day and volume increased by 20%"',
required_columns=("close", "volume"),
func=v5_signal_day_volume_up_20pct,
),
"P1": AtomSpec(
id="P1", name="3d Decline ≥7%", category="momentum",
confidence=0.95, confidence_level="HIGH",
confidence_rationale="Direct pct_change with explicit threshold.",
source_quote='"the Nasdaq declined 7% in 3 days"',
required_columns=("close",),
func=p1_3d_decline_at_least_7pct,
),
"P2": AtomSpec(
id="P2", name="1d Gain is 250d Max", category="momentum",
confidence=0.95, confidence_level="HIGH",
confidence_rationale="Rolling-max rank, no ambiguity.",
source_quote='"S&P had its highest one-day gain in 250 days"',
required_columns=("close",),
func=p2_1d_gain_is_250d_max,
),
"P3": AtomSpec(
id="P3", name="10d Gain is 180d Max", category="momentum",
confidence=0.95, confidence_level="HIGH",
confidence_rationale="Direct rolling-max rank.",
source_quote='"Nasdaq generated its greatest 10-day gain in 180 trading days"',
required_columns=("close",),
func=p3_10d_gain_is_180d_max,
),
"P4": AtomSpec(
id="P4", name="10d ROC is 180d Max (companion)", category="momentum",
confidence=0.65, confidence_level="MEDIUM",
confidence_rationale="Berg cited as distinct from P3 in same combo; possibly different index.",
source_quote='"S&P 500 also generated its greatest 10-day change high rate of change in 180 trading days"',
required_columns=("close",),
func=p4_10d_roc_is_180d_max,
),
"P5": AtomSpec(
id="P5", name="10d ROC ≥20% (SOX)", category="momentum",
confidence=0.95, confidence_level="HIGH",
confidence_rationale="Direct threshold on pct_change(10).",
source_quote='"SOX 10-day rate of change was above 20%"',
required_columns=("close",),
func=p5_10d_roc_at_least_20pct,
),
"P6": AtomSpec(
id="P6", name="Drawdown from 252d High", category="momentum",
confidence=0.90, confidence_level="HIGH",
confidence_rationale="Standard drawdown; Berg gave multiple thresholds across combos.",
source_quote='"S&P declined 9% and held this low" / "down 5%" / "down 13%"',
required_columns=("close",),
func=p6_drawdown_from_252d_high,
),
"P6_HIST": AtomSpec(
id="P6_HIST", name="Drawdown ≤threshold reached within lookback (historical)",
category="momentum",
confidence=0.90, confidence_level="HIGH",
confidence_rationale="Historical-anywhere variant of P6; matches Berg's 'declined X%' verbal usage.",
source_quote='"the market declined 10%, held the low for 19 days" — drawdown is regime, not same-bar.',
required_columns=("close",),
func=p6_drawdown_reached_within,
),
"P7": AtomSpec(
id="P7", name="Recovery Thrust into New High", category="momentum",
confidence=0.70, confidence_level="MEDIUM",
confidence_rationale="Multi-part reconstruction; Berg's 'from last 10% decline' is interpretable.",
source_quote='"S&P gained 6% from its last 10% decline into a new recovery high"',
required_columns=("close",),
func=p7_recovery_thrust,
),
"P8": AtomSpec(
id="P8", name="Single-Day Gain ≥9%", category="momentum",
confidence=0.98, confidence_level="HIGH",
confidence_rationale="Trivial pct_change threshold.",
source_quote='"both the Nasdaq and the S&P gained more than 9% on the day"',
required_columns=("close",),
func=p8_single_day_gain_at_least,
),
"P_ROC74": AtomSpec(
id="P_ROC74", name="5d ROC ≥7.4% (bull-market trigger)", category="momentum",
confidence=0.90, confidence_level="HIGH",
confidence_rationale="Threshold (7.4%) and outcome (bull-market initiation since 1928) quoted verbatim; pure pct_change(5).",
source_quote='"Every bull market since 1928 has begun with a 5-day move of at least 7.4% on the S&P 500" (Forward Guidance Feb 2024)',
required_columns=("close",),
func=p_roc74_5d_bull_trigger,
),
"S1": AtomSpec(
id="S1", name="≥10 Consecutive Up Days (VIX)", category="streak",
confidence=0.98, confidence_level="HIGH",
confidence_rationale="Pure consecutive-day counter.",
source_quote='"to be up 10 days in a row was 0.01% of the time"',
required_columns=("close",),
func=s1_consecutive_up_days,
),
"S2": AtomSpec(
id="S2", name="≥10 Consecutive Down Days (VIX)", category="streak",
confidence=0.98, confidence_level="HIGH",
confidence_rationale="Pure consecutive-day counter.",
source_quote='"to be down 10 days in a row happened 0.04% of the time"',
required_columns=("close",),
func=s2_consecutive_down_days,
),
"S5": AtomSpec(
id="S5", name="Up 8 of 9 Days (Russell)", category="streak",
confidence=0.95, confidence_level="HIGH",
confidence_rationale="Direct rolling sum.",
source_quote='"the Russell was up 8 out of 9 days"',
required_columns=("close",),
func=s5_up_n_of_m_days,
),
"S6": AtomSpec(
id="S6", name="Up 16 of 19 Days (Nasdaq)", category="streak",
confidence=0.95, confidence_level="HIGH",
confidence_rationale="Specific case of n-of-m up days.",
source_quote='"Nasdaq was up 16 out of those 19 days"',
required_columns=("close",),
func=s6_up_16_of_19_days,
),
"VX1": AtomSpec(
id="VX1", name="VIX Intraday Range 45-60", category="volatility",
confidence=0.70, confidence_level="MEDIUM",
confidence_rationale="Berg said 'traded between 45 and 60'; could mean close, range, or intraday H/L.",
source_quote='"the VIX traded between 45 and 60"',
required_columns=("high", "low"),
func=vx1_vix_in_range,
),
"VX2": AtomSpec(
id="VX2", name="VIX 5d/20d Stdev Ratio <0.87", category="volatility",
confidence=0.60, confidence_level="MEDIUM",
confidence_rationale="Berg explicitly: 'I don't give you the exact formula' — return-stdev is our reading.",
source_quote='"5 days over 20 days of VIX ... less than 0.87 means an extreme downward movement"',
required_columns=("close",),
func=vx2_vix_5d_20d_stdev_ratio,
),
"VX_VIX_5_30": AtomSpec(
id="VX_VIX_5_30", name="VIX 5d/30d Mean Ratio (compression/expansion)", category="volatility",
confidence=0.70, confidence_level="MEDIUM",
confidence_rationale="Direction and rough thresholds match Berg's X posts; exact cutoffs reconstructed.",
source_quote='Berg X posts 2025-2026 referencing "VIX 5/30 ratio" extremes',
required_columns=("close",),
func=vx_vix_5d_30d_avg_ratio,
),
"T1": AtomSpec(
id="T1", name="Close <250d SMA (downtrend filter)", category="trend",
confidence=0.99, confidence_level="HIGH",
confidence_rationale="Standard SMA filter; Berg specifies as filter only.",
source_quote='"S&P is below its 250-day moving average, meaning that it\'s been in a downtrend"',
required_columns=("close",),
func=t1_below_250d_sma,
),
"T2": AtomSpec(
id="T2", name="Days Holding Low (count)", category="trend",
confidence=0.70, confidence_level="MEDIUM",
confidence_rationale="Multiple valid 'swing low' definitions; rolling-min chosen for stability.",
source_quote='"we count days off a low ... held its low for 1, 2, 3, 4, 7, 9, 19 days"',
required_columns=("close",),
func=t2_days_holding_low,
),
"B6": AtomSpec(
id="B6", name="New N-Day Low (default 60)", category="breadth",
confidence=0.95, confidence_level="HIGH",
confidence_rationale="Rolling min; Berg specifies 60d.",
source_quote='"the S&P to make at least a new 60-day low"',
required_columns=("close",),
func=b6_new_n_day_low,
),
"B7": AtomSpec(
id="B7", name="New N-Day Closing High", category="breadth",
confidence=0.95, confidence_level="HIGH",
confidence_rationale="Rolling max; Berg uses 30d (recovery) and 504d (2-year breakout).",
source_quote='"S&P has closed at a recovery high, which is a 30-day new high" / "2-year high on a closing basis"',
required_columns=("close",),
func=b7_new_n_day_closing_high,
),
"B_AD_5D_087": AtomSpec(
id="B_AD_5D_087", name="NYSE 5d A/D ≤0.87 at ATH (breadth divergence at peak)",
category="breadth",
confidence=0.75, confidence_level="MEDIUM",
confidence_rationale="Threshold (0.87) and direction quoted; 'at ATH' framing requires 5d-proximity interpretation.",
source_quote='Berg X — breadth-divergence-at-peak precondition for tops',
required_columns=("close", "__external_ad_5d_ratio__"),
func=b_ad_ratio_at_ath,
),
"B_DA_SP600_50_1": AtomSpec(
id="B_DA_SP600_50_1", name="S&P 600 50-to-1 Decliners/Advancers", category="breadth",
confidence=0.85, confidence_level="HIGH",
confidence_rationale="Threshold (50:1) and outcome (18 cases since 1995, 100% up 120d) quoted verbatim.",
source_quote='"S&P 600 closed 50-to-1 declining/advancing — 18 cases since 1995, 100% higher 120 days later"',
required_columns=("__external_da_ratio__",),
func=b_da_extreme_threshold,
),
"T_RIN_HI": AtomSpec(
id="T_RIN_HI", name="NYSE TRIN ≥12.5 (panic) or ≥15.5 (rare panic)", category="breadth",
confidence=0.85, confidence_level="HIGH",
confidence_rationale="Thresholds (12.5 / 15.5) anchored to historic dates (2020-03-12: 15.50, Oct 2008: 12.52).",
source_quote='Berg cites TRIN 15.50 (Mar 2020) and 12.52 (Oct 2008) as generational panic prints',
required_columns=("__external_trin__",),
func=t_rin_extreme,
),
}
# ============================================================================
# Public helpers
# ============================================================================
def list_atoms(min_confidence: float = 0.0) -> list[AtomSpec]:
"""Return all AtomSpecs with confidence >= min_confidence, sorted by ID."""
return sorted(
(s for s in BERG_ATOM_REGISTRY.values() if s.confidence >= min_confidence),
key=lambda s: s.id,
)
def evaluate_all(
instrument_panel: Mapping[str, pd.DataFrame],
atom_ids: list[str] | None = None,
) -> pd.DataFrame:
"""Evaluate atoms across an instrument panel.
Args:
instrument_panel: dict mapping instrument symbol (e.g., 'SPY', 'QQQ', 'VIX')
to OHLCV DataFrame.
atom_ids: optional subset; default = all atoms.
Returns:
long-form DataFrame with columns:
date, atom_id, instrument, fired (bool)
"""
if atom_ids is None:
atom_ids = list(BERG_ATOM_REGISTRY.keys())
rows = []
for atom_id in atom_ids:
spec = BERG_ATOM_REGISTRY[atom_id]
for instrument, df in instrument_panel.items():
try:
if not all(c in df.columns for c in spec.required_columns):
continue
result = spec.func(df)
fired_dates = result[result].index
for d in fired_dates:
rows.append({
"date": d, "atom_id": atom_id,
"instrument": instrument, "fired": True,
})
except Exception as e:
# In production, route through trackError
print(f"[berg_atoms] atom {atom_id} on {instrument} failed: {e}")
continue
return pd.DataFrame(rows)