# indicators.py from __future__ import annotations import math import numpy as np import pandas as pd # ===== Helper ===== def _as_1d(a) -> np.ndarray: return np.asarray(a, dtype=float).ravel() # ===== Indicators ===== def sma(arr: np.ndarray, period: int) -> np.ndarray: arr = _as_1d(arr); n = len(arr) if n == 0: return np.array([], float) if n < period: return np.full(n, np.nan) w = np.ones(period) / period out = np.convolve(arr, w, mode="valid") return np.concatenate([np.full(period-1, np.nan), out]) def ema(arr: np.ndarray, period: int) -> np.ndarray: arr = _as_1d(arr); n = len(arr) out = np.full(n, np.nan, float) if n == 0: return out k = 2/(period+1); e = np.nan for i, x in enumerate(arr): e = x if (isinstance(e, float) and math.isnan(e)) else x*k + e*(1-k) out[i] = e return out def rsi(arr: np.ndarray, period: int = 14) -> np.ndarray: arr = _as_1d(arr); n = len(arr) if n == 0: return np.array([], float) if n < period+1: return np.full(n, np.nan) d = np.diff(arr, prepend=arr[0]) g = np.where(d > 0, d, 0.0) L = np.where(d < 0, -d, 0.0) ag, al = ema(g, period), ema(L, period) rs = ag / (al + 1e-9) return 100 - (100 / (1 + rs)) def atr(H: np.ndarray, L: np.ndarray, C: np.ndarray, period: int = 14) -> np.ndarray: H, L, C = _as_1d(H), _as_1d(L), _as_1d(C); n = len(C) if n == 0: return np.array([], float) if n < period+1: return np.full(n, np.nan) pc = np.roll(C, 1) tr = np.maximum.reduce([H - L, np.abs(H - pc), np.abs(L - pc)]) tr[0] = H[0] - L[0] return ema(tr, period) def macd(arr: np.ndarray, fast=12, slow=26, signal=9): ef, es = ema(arr, fast), ema(arr, slow) line = ef - es sig = ema(line, signal) hist = line - sig return line, sig, hist def stoch_kd(H: np.ndarray, L: np.ndarray, C: np.ndarray, period=14, smooth=3): H, L, C = _as_1d(H), _as_1d(L), _as_1d(C); n = len(C) if n == 0: z = np.array([], float); return z, z if n < period: return np.full(n, np.nan), np.full(n, np.nan) lowest = pd.Series(L).rolling(period, min_periods=1).min().to_numpy() highest = pd.Series(H).rolling(period, min_periods=1).max().to_numpy() k = 100 * (C - lowest) / (highest - lowest + 1e-9) d = pd.Series(k).rolling(smooth, min_periods=1).mean().to_numpy() return k, d def bollinger(arr: np.ndarray, period=20, dev=2.0): arr = _as_1d(arr); n = len(arr) if n == 0: z = np.array([], float); return z, z, z s = pd.Series(arr) ma = s.rolling(period, min_periods=1).mean().to_numpy() sd = s.rolling(period, min_periods=1).std(ddof=0).to_numpy() return ma, ma + dev*sd, ma - dev*sd def adx_val(H: np.ndarray, L: np.ndarray, C: np.ndarray, period=14): H, L, C = _as_1d(H), _as_1d(L), _as_1d(C); n = len(C) if n == 0: z = np.array([], float); return z, z, z up_move = H - np.roll(H, 1); down_move = np.roll(L, 1) - L up_dm = np.where((up_move > down_move) & (up_move > 0), up_move, 0.0) down_dm = np.where((down_move > up_move) & (down_move > 0), down_move, 0.0) tr1 = H - L; tr2 = np.abs(H - np.roll(C, 1)); tr3 = np.abs(L - np.roll(C, 1)) tr = np.maximum.reduce([tr1, tr2, tr3]); tr[0] = tr1[0] atr14 = ema(tr, period) pdi = 100 * ema(up_dm, period) / (atr14 + 1e-9) mdi = 100 * ema(down_dm, period) / (atr14 + 1e-9) dx = 100 * np.abs(pdi - mdi) / (pdi + mdi + 1e-9) adx = ema(dx, period) return adx, pdi, mdi def supertrend(H: np.ndarray, L: np.ndarray, C: np.ndarray, period=10, mult=3.0): H, L, C = _as_1d(H), _as_1d(L), _as_1d(C); n = len(C) if n == 0: return np.array([], float) a = atr(H, L, C, period).copy() hl2 = (H + L) / 2.0 ub = hl2 + mult * a lb = hl2 - mult * a st = np.full(n, np.nan) for i in range(1, n): prev = st[i-1] if not np.isnan(st[i-1]) else hl2[i-1] st[i] = ub[i] if C[i-1] <= prev else lb[i] return st # ===== Pipeline: add_indicators ===== def add_indicators(df: pd.DataFrame, min_bars: int = 200) -> pd.DataFrame: if len(df) < min_bars: raise ValueError(f"Too few bars: {len(df)} < {min_bars}") C, H, L = df["close"].to_numpy(), df["high"].to_numpy(), df["low"].to_numpy() df["ema20"] = ema(C, 20) df["ema50"] = ema(C, 50) df["rsi14"] = rsi(C, 14) df["atr14"] = atr(H, L, C, 14) m_line, m_sig, _ = macd(C) df["macd"], df["macd_sig"] = m_line, m_sig k, d = stoch_kd(H, L, C, 14, 3) df["stoch_k"], df["stoch_d"] = k, d mid, up, dn = bollinger(C, 20, 2.0) df["bb_mid"], df["bb_up"], df["bb_dn"] = mid, up, dn adx_, pdi, mdi = adx_val(H, L, C, 14) df["adx"], df["pdi"], df["mdi"] = adx_, pdi, mdi df["supertrend"] = supertrend(H, L, C, 10, 3.0) return df