stock/indicators.py
2025-08-15 12:32:27 +02:00

126 lines
4.7 KiB
Python

# indicators.py
from __future__ import annotations
import math
import numpy as np
import pandas as pd
# ===== Helper =====
def _as_1d(a) -> np.ndarray:
return np.asarray(a, dtype=float).ravel()
# ===== Indicators =====
def sma(arr: np.ndarray, period: int) -> np.ndarray:
arr = _as_1d(arr); n = len(arr)
if n == 0: return np.array([], float)
if n < period: return np.full(n, np.nan)
w = np.ones(period) / period
out = np.convolve(arr, w, mode="valid")
return np.concatenate([np.full(period-1, np.nan), out])
def ema(arr: np.ndarray, period: int) -> np.ndarray:
arr = _as_1d(arr); n = len(arr)
out = np.full(n, np.nan, float)
if n == 0: return out
k = 2/(period+1); e = np.nan
for i, x in enumerate(arr):
e = x if (isinstance(e, float) and math.isnan(e)) else x*k + e*(1-k)
out[i] = e
return out
def rsi(arr: np.ndarray, period: int = 14) -> np.ndarray:
arr = _as_1d(arr); n = len(arr)
if n == 0: return np.array([], float)
if n < period+1: return np.full(n, np.nan)
d = np.diff(arr, prepend=arr[0])
g = np.where(d > 0, d, 0.0)
L = np.where(d < 0, -d, 0.0)
ag, al = ema(g, period), ema(L, period)
rs = ag / (al + 1e-9)
return 100 - (100 / (1 + rs))
def atr(H: np.ndarray, L: np.ndarray, C: np.ndarray, period: int = 14) -> np.ndarray:
H, L, C = _as_1d(H), _as_1d(L), _as_1d(C); n = len(C)
if n == 0: return np.array([], float)
if n < period+1: return np.full(n, np.nan)
pc = np.roll(C, 1)
tr = np.maximum.reduce([H - L, np.abs(H - pc), np.abs(L - pc)])
tr[0] = H[0] - L[0]
return ema(tr, period)
def macd(arr: np.ndarray, fast=12, slow=26, signal=9):
ef, es = ema(arr, fast), ema(arr, slow)
line = ef - es
sig = ema(line, signal)
hist = line - sig
return line, sig, hist
def stoch_kd(H: np.ndarray, L: np.ndarray, C: np.ndarray, period=14, smooth=3):
H, L, C = _as_1d(H), _as_1d(L), _as_1d(C); n = len(C)
if n == 0:
z = np.array([], float); return z, z
if n < period:
return np.full(n, np.nan), np.full(n, np.nan)
lowest = pd.Series(L).rolling(period, min_periods=1).min().to_numpy()
highest = pd.Series(H).rolling(period, min_periods=1).max().to_numpy()
k = 100 * (C - lowest) / (highest - lowest + 1e-9)
d = pd.Series(k).rolling(smooth, min_periods=1).mean().to_numpy()
return k, d
def bollinger(arr: np.ndarray, period=20, dev=2.0):
arr = _as_1d(arr); n = len(arr)
if n == 0:
z = np.array([], float); return z, z, z
s = pd.Series(arr)
ma = s.rolling(period, min_periods=1).mean().to_numpy()
sd = s.rolling(period, min_periods=1).std(ddof=0).to_numpy()
return ma, ma + dev*sd, ma - dev*sd
def adx_val(H: np.ndarray, L: np.ndarray, C: np.ndarray, period=14):
H, L, C = _as_1d(H), _as_1d(L), _as_1d(C); n = len(C)
if n == 0:
z = np.array([], float); return z, z, z
up_move = H - np.roll(H, 1); down_move = np.roll(L, 1) - L
up_dm = np.where((up_move > down_move) & (up_move > 0), up_move, 0.0)
down_dm = np.where((down_move > up_move) & (down_move > 0), down_move, 0.0)
tr1 = H - L; tr2 = np.abs(H - np.roll(C, 1)); tr3 = np.abs(L - np.roll(C, 1))
tr = np.maximum.reduce([tr1, tr2, tr3]); tr[0] = tr1[0]
atr14 = ema(tr, period)
pdi = 100 * ema(up_dm, period) / (atr14 + 1e-9)
mdi = 100 * ema(down_dm, period) / (atr14 + 1e-9)
dx = 100 * np.abs(pdi - mdi) / (pdi + mdi + 1e-9)
adx = ema(dx, period)
return adx, pdi, mdi
def supertrend(H: np.ndarray, L: np.ndarray, C: np.ndarray, period=10, mult=3.0):
H, L, C = _as_1d(H), _as_1d(L), _as_1d(C); n = len(C)
if n == 0: return np.array([], float)
a = atr(H, L, C, period).copy()
hl2 = (H + L) / 2.0
ub = hl2 + mult * a
lb = hl2 - mult * a
st = np.full(n, np.nan)
for i in range(1, n):
prev = st[i-1] if not np.isnan(st[i-1]) else hl2[i-1]
st[i] = ub[i] if C[i-1] <= prev else lb[i]
return st
# ===== Pipeline: add_indicators =====
def add_indicators(df: pd.DataFrame, min_bars: int = 200) -> pd.DataFrame:
if len(df) < min_bars:
raise ValueError(f"Too few bars: {len(df)} < {min_bars}")
C, H, L = df["close"].to_numpy(), df["high"].to_numpy(), df["low"].to_numpy()
df["ema20"] = ema(C, 20)
df["ema50"] = ema(C, 50)
df["rsi14"] = rsi(C, 14)
df["atr14"] = atr(H, L, C, 14)
m_line, m_sig, _ = macd(C)
df["macd"], df["macd_sig"] = m_line, m_sig
k, d = stoch_kd(H, L, C, 14, 3)
df["stoch_k"], df["stoch_d"] = k, d
mid, up, dn = bollinger(C, 20, 2.0)
df["bb_mid"], df["bb_up"], df["bb_dn"] = mid, up, dn
adx_, pdi, mdi = adx_val(H, L, C, 14)
df["adx"], df["pdi"], df["mdi"] = adx_, pdi, mdi
df["supertrend"] = supertrend(H, L, C, 10, 3.0)
return df