126 lines
4.7 KiB
Python
126 lines
4.7 KiB
Python
# indicators.py
|
|
from __future__ import annotations
|
|
import math
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
# ===== Helper =====
|
|
def _as_1d(a) -> np.ndarray:
|
|
return np.asarray(a, dtype=float).ravel()
|
|
|
|
# ===== Indicators =====
|
|
def sma(arr: np.ndarray, period: int) -> np.ndarray:
|
|
arr = _as_1d(arr); n = len(arr)
|
|
if n == 0: return np.array([], float)
|
|
if n < period: return np.full(n, np.nan)
|
|
w = np.ones(period) / period
|
|
out = np.convolve(arr, w, mode="valid")
|
|
return np.concatenate([np.full(period-1, np.nan), out])
|
|
|
|
def ema(arr: np.ndarray, period: int) -> np.ndarray:
|
|
arr = _as_1d(arr); n = len(arr)
|
|
out = np.full(n, np.nan, float)
|
|
if n == 0: return out
|
|
k = 2/(period+1); e = np.nan
|
|
for i, x in enumerate(arr):
|
|
e = x if (isinstance(e, float) and math.isnan(e)) else x*k + e*(1-k)
|
|
out[i] = e
|
|
return out
|
|
|
|
def rsi(arr: np.ndarray, period: int = 14) -> np.ndarray:
|
|
arr = _as_1d(arr); n = len(arr)
|
|
if n == 0: return np.array([], float)
|
|
if n < period+1: return np.full(n, np.nan)
|
|
d = np.diff(arr, prepend=arr[0])
|
|
g = np.where(d > 0, d, 0.0)
|
|
L = np.where(d < 0, -d, 0.0)
|
|
ag, al = ema(g, period), ema(L, period)
|
|
rs = ag / (al + 1e-9)
|
|
return 100 - (100 / (1 + rs))
|
|
|
|
def atr(H: np.ndarray, L: np.ndarray, C: np.ndarray, period: int = 14) -> np.ndarray:
|
|
H, L, C = _as_1d(H), _as_1d(L), _as_1d(C); n = len(C)
|
|
if n == 0: return np.array([], float)
|
|
if n < period+1: return np.full(n, np.nan)
|
|
pc = np.roll(C, 1)
|
|
tr = np.maximum.reduce([H - L, np.abs(H - pc), np.abs(L - pc)])
|
|
tr[0] = H[0] - L[0]
|
|
return ema(tr, period)
|
|
|
|
def macd(arr: np.ndarray, fast=12, slow=26, signal=9):
|
|
ef, es = ema(arr, fast), ema(arr, slow)
|
|
line = ef - es
|
|
sig = ema(line, signal)
|
|
hist = line - sig
|
|
return line, sig, hist
|
|
|
|
def stoch_kd(H: np.ndarray, L: np.ndarray, C: np.ndarray, period=14, smooth=3):
|
|
H, L, C = _as_1d(H), _as_1d(L), _as_1d(C); n = len(C)
|
|
if n == 0:
|
|
z = np.array([], float); return z, z
|
|
if n < period:
|
|
return np.full(n, np.nan), np.full(n, np.nan)
|
|
lowest = pd.Series(L).rolling(period, min_periods=1).min().to_numpy()
|
|
highest = pd.Series(H).rolling(period, min_periods=1).max().to_numpy()
|
|
k = 100 * (C - lowest) / (highest - lowest + 1e-9)
|
|
d = pd.Series(k).rolling(smooth, min_periods=1).mean().to_numpy()
|
|
return k, d
|
|
|
|
def bollinger(arr: np.ndarray, period=20, dev=2.0):
|
|
arr = _as_1d(arr); n = len(arr)
|
|
if n == 0:
|
|
z = np.array([], float); return z, z, z
|
|
s = pd.Series(arr)
|
|
ma = s.rolling(period, min_periods=1).mean().to_numpy()
|
|
sd = s.rolling(period, min_periods=1).std(ddof=0).to_numpy()
|
|
return ma, ma + dev*sd, ma - dev*sd
|
|
|
|
def adx_val(H: np.ndarray, L: np.ndarray, C: np.ndarray, period=14):
|
|
H, L, C = _as_1d(H), _as_1d(L), _as_1d(C); n = len(C)
|
|
if n == 0:
|
|
z = np.array([], float); return z, z, z
|
|
up_move = H - np.roll(H, 1); down_move = np.roll(L, 1) - L
|
|
up_dm = np.where((up_move > down_move) & (up_move > 0), up_move, 0.0)
|
|
down_dm = np.where((down_move > up_move) & (down_move > 0), down_move, 0.0)
|
|
tr1 = H - L; tr2 = np.abs(H - np.roll(C, 1)); tr3 = np.abs(L - np.roll(C, 1))
|
|
tr = np.maximum.reduce([tr1, tr2, tr3]); tr[0] = tr1[0]
|
|
atr14 = ema(tr, period)
|
|
pdi = 100 * ema(up_dm, period) / (atr14 + 1e-9)
|
|
mdi = 100 * ema(down_dm, period) / (atr14 + 1e-9)
|
|
dx = 100 * np.abs(pdi - mdi) / (pdi + mdi + 1e-9)
|
|
adx = ema(dx, period)
|
|
return adx, pdi, mdi
|
|
|
|
def supertrend(H: np.ndarray, L: np.ndarray, C: np.ndarray, period=10, mult=3.0):
|
|
H, L, C = _as_1d(H), _as_1d(L), _as_1d(C); n = len(C)
|
|
if n == 0: return np.array([], float)
|
|
a = atr(H, L, C, period).copy()
|
|
hl2 = (H + L) / 2.0
|
|
ub = hl2 + mult * a
|
|
lb = hl2 - mult * a
|
|
st = np.full(n, np.nan)
|
|
for i in range(1, n):
|
|
prev = st[i-1] if not np.isnan(st[i-1]) else hl2[i-1]
|
|
st[i] = ub[i] if C[i-1] <= prev else lb[i]
|
|
return st
|
|
|
|
# ===== Pipeline: add_indicators =====
|
|
def add_indicators(df: pd.DataFrame, min_bars: int = 200) -> pd.DataFrame:
|
|
if len(df) < min_bars:
|
|
raise ValueError(f"Too few bars: {len(df)} < {min_bars}")
|
|
C, H, L = df["close"].to_numpy(), df["high"].to_numpy(), df["low"].to_numpy()
|
|
df["ema20"] = ema(C, 20)
|
|
df["ema50"] = ema(C, 50)
|
|
df["rsi14"] = rsi(C, 14)
|
|
df["atr14"] = atr(H, L, C, 14)
|
|
m_line, m_sig, _ = macd(C)
|
|
df["macd"], df["macd_sig"] = m_line, m_sig
|
|
k, d = stoch_kd(H, L, C, 14, 3)
|
|
df["stoch_k"], df["stoch_d"] = k, d
|
|
mid, up, dn = bollinger(C, 20, 2.0)
|
|
df["bb_mid"], df["bb_up"], df["bb_dn"] = mid, up, dn
|
|
adx_, pdi, mdi = adx_val(H, L, C, 14)
|
|
df["adx"], df["pdi"], df["mdi"] = adx_, pdi, mdi
|
|
df["supertrend"] = supertrend(H, L, C, 10, 3.0)
|
|
return df
|