stock/trading_bot.py
2025-08-15 12:32:27 +02:00

158 lines
5.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# trading_bot_workers.py
# pip install yfinance pandas numpy
from __future__ import annotations
import time, warnings, random, multiprocessing as mp, json
from dataclasses import dataclass, asdict
from typing import Dict, List, Tuple, Optional
import pandas as pd
import yfinance as yf
from indicators import add_indicators
from strategies import get_signal
from portfolio import Portfolio
warnings.filterwarnings("ignore", category=FutureWarning)
# ========= KONFIG =========
START_CAPITAL = 10_000.0
MIN_BARS = 200
SCAN_EVERY_S = 60
YF_JITTER_S = (0.05, 0.25)
PRIMARY = ("7d","1m") # preferowane: 1m/7d
FALLBACKS = [("60d","5m"), ("60d","15m")] # fallbacki gdy 1m brak
FOREX_20 = [
"EURUSD=X","USDJPY=X","GBPUSD=X","USDCHF=X","AUDUSD=X",
"USDCAD=X","NZDUSD=X","EURJPY=X","EURGBP=X","EURCHF=X",
"GBPJPY=X","CHFJPY=X","AUDJPY=X","AUDNZD=X","EURAUD=X",
"EURCAD=X","GBPCAD=X","CADJPY=X","NZDJPY=X","GC=F"
]
CRYPTO_20 = [
"BTC-USD","ETH-USD","BNB-USD","SOL-USD","XRP-USD","ADA-USD","DOGE-USD","TRX-USD","TON-USD","DOT-USD",
"LTC-USD","BCH-USD","ATOM-USD","LINK-USD","XLM-USD","ETC-USD","NEAR-USD","OP-USD"
]
ALL_TICKERS = FOREX_20 + CRYPTO_20
# ========= Pobieranie z fallbackiem =========
def yf_download_with_fallback(ticker: str) -> Tuple[pd.DataFrame, str, str]:
time.sleep(random.uniform(*YF_JITTER_S))
for period, interval in (PRIMARY, *FALLBACKS):
df = yf.download(ticker, period=period, interval=interval, auto_adjust=False, progress=False, threads=False)
if df is not None and not df.empty:
df = df.rename(columns=str.lower).dropna()
if not isinstance(df.index, pd.DatetimeIndex):
df.index = pd.to_datetime(df.index)
return df, period, interval
raise ValueError("No data in primary/fallback intervals")
# ========= Worker: jeden instrument = jeden proces =========
@dataclass
class Signal:
ticker: str
time: str
price: float
signal: int
period: str
interval: str
error: Optional[str] = None
def worker(ticker: str, out_q: mp.Queue, stop_evt: mp.Event, min_bars: int = MIN_BARS):
while not stop_evt.is_set():
try:
df, used_period, used_interval = yf_download_with_fallback(ticker)
if len(df) < min_bars:
raise ValueError(f"Too few bars: {len(df)}<{min_bars} at {used_interval}")
df = add_indicators(df, min_bars=min_bars)
sig = get_signal(df)
price = float(df["close"].iloc[-1])
ts = str(df.index[-1])
out_q.put(Signal(ticker, ts, price, sig, used_period, used_interval))
except Exception as e:
out_q.put(Signal(ticker, time.strftime("%Y-%m-%d %H:%M:%S"), float("nan"), 0, "NA", "NA", error=str(e)))
time.sleep(SCAN_EVERY_S)
# ========= Pomocnicze: serializacja do JSON =========
def _to_native(obj):
"""Bezpieczny rzut na prymitywy JSON (float/int/str/bool/None)."""
if isinstance(obj, (float, int, str)) or obj is None:
return obj
if isinstance(obj, dict):
return {k: _to_native(v) for k, v in obj.items()}
if isinstance(obj, (list, tuple)):
return [_to_native(x) for x in obj]
if hasattr(obj, "__dataclass_fields__"): # dataclass
return _to_native(asdict(obj))
try:
return float(obj)
except Exception:
return str(obj)
def save_json(path: str, data) -> None:
with open(path, "w", encoding="utf-8") as f:
json.dump(_to_native(data), f, ensure_ascii=False, indent=2)
# ========= Master (koordynacja) =========
def main():
mp.set_start_method("spawn", force=True)
out_q: mp.Queue = mp.Queue()
stop_evt: mp.Event = mp.Event()
# 1 instrument = 1 proces (daemon)
procs: List[mp.Process] = []
for t in ALL_TICKERS:
p = mp.Process(target=worker, args=(t, out_q, stop_evt), daemon=True)
p.start()
procs.append(p)
portfolio = Portfolio(START_CAPITAL)
last_dump = 0.0
signals_window: Dict[str, Signal] = {}
try:
while True:
# zbieramy sygnały przez okno ~1 minuty
deadline = time.time() + SCAN_EVERY_S
while time.time() < deadline:
try:
s: Signal = out_q.get(timeout=0.5)
signals_window[s.ticker] = s
except Exception:
pass
if signals_window:
sig_list = list(signals_window.values())
portfolio.on_signals(sig_list)
now = time.time()
if now - last_dump > SCAN_EVERY_S - 1:
# --- JSON zapisy ---
save_json("signals_scan.json", [asdict(s) for s in sig_list])
save_json("portfolio_history.json", portfolio.history)
save_json("positions.json", [{"ticker": t, **p} for t, p in portfolio.positions.items()])
snapshot = {
"time": time.strftime("%Y-%m-%d %H:%M:%S"),
"last_history": portfolio.history[-1] if portfolio.history else None,
"positions": [{"ticker": t, **p} for t, p in portfolio.positions.items()],
"signals": [asdict(s) for s in sig_list],
"capital_start": START_CAPITAL
}
save_json("snapshot.json", snapshot)
last_dump = now
if portfolio.history:
print(pd.DataFrame(portfolio.history).tail(1).to_string(index=False))
else:
print("Brak sygnałów w oknie czekam...")
except KeyboardInterrupt:
print("\nStopping workers...")
stop_evt.set()
for p in procs:
p.join(timeout=5)
if __name__ == "__main__":
main()