# trading_bot_workers.py # pip install yfinance pandas numpy from __future__ import annotations import time, warnings, random, multiprocessing as mp, json from dataclasses import dataclass, asdict from typing import Dict, List, Tuple, Optional import pandas as pd import yfinance as yf from indicators import add_indicators from strategies import get_signal from portfolio import Portfolio warnings.filterwarnings("ignore", category=FutureWarning) # ========= KONFIG ========= START_CAPITAL = 10_000.0 MIN_BARS = 200 SCAN_EVERY_S = 60 YF_JITTER_S = (0.05, 0.25) PRIMARY = ("7d","1m") # preferowane: 1m/7d FALLBACKS = [("60d","5m"), ("60d","15m")] # fallbacki gdy 1m brak FOREX_20 = [ "EURUSD=X","USDJPY=X","GBPUSD=X","USDCHF=X","AUDUSD=X", "USDCAD=X","NZDUSD=X","EURJPY=X","EURGBP=X","EURCHF=X", "GBPJPY=X","CHFJPY=X","AUDJPY=X","AUDNZD=X","EURAUD=X", "EURCAD=X","GBPCAD=X","CADJPY=X","NZDJPY=X","GC=F" ] CRYPTO_20 = [ "BTC-USD","ETH-USD","BNB-USD","SOL-USD","XRP-USD","ADA-USD","DOGE-USD","TRX-USD","TON-USD","DOT-USD", "LTC-USD","BCH-USD","ATOM-USD","LINK-USD","XLM-USD","ETC-USD","NEAR-USD","OP-USD" ] ALL_TICKERS = FOREX_20 + CRYPTO_20 # ========= Pobieranie z fallbackiem ========= def yf_download_with_fallback(ticker: str) -> Tuple[pd.DataFrame, str, str]: time.sleep(random.uniform(*YF_JITTER_S)) for period, interval in (PRIMARY, *FALLBACKS): df = yf.download(ticker, period=period, interval=interval, auto_adjust=False, progress=False, threads=False) if df is not None and not df.empty: df = df.rename(columns=str.lower).dropna() if not isinstance(df.index, pd.DatetimeIndex): df.index = pd.to_datetime(df.index) return df, period, interval raise ValueError("No data in primary/fallback intervals") # ========= Worker: jeden instrument = jeden proces ========= @dataclass class Signal: ticker: str time: str price: float signal: int period: str interval: str error: Optional[str] = None def worker(ticker: str, out_q: mp.Queue, stop_evt: mp.Event, min_bars: int = MIN_BARS): while not stop_evt.is_set(): try: df, used_period, used_interval = yf_download_with_fallback(ticker) if len(df) < min_bars: raise ValueError(f"Too few bars: {len(df)}<{min_bars} at {used_interval}") df = add_indicators(df, min_bars=min_bars) sig = get_signal(df) price = float(df["close"].iloc[-1]) ts = str(df.index[-1]) out_q.put(Signal(ticker, ts, price, sig, used_period, used_interval)) except Exception as e: out_q.put(Signal(ticker, time.strftime("%Y-%m-%d %H:%M:%S"), float("nan"), 0, "NA", "NA", error=str(e))) time.sleep(SCAN_EVERY_S) # ========= Pomocnicze: serializacja do JSON ========= def _to_native(obj): """Bezpieczny rzut na prymitywy JSON (float/int/str/bool/None).""" if isinstance(obj, (float, int, str)) or obj is None: return obj if isinstance(obj, dict): return {k: _to_native(v) for k, v in obj.items()} if isinstance(obj, (list, tuple)): return [_to_native(x) for x in obj] if hasattr(obj, "__dataclass_fields__"): # dataclass return _to_native(asdict(obj)) try: return float(obj) except Exception: return str(obj) def save_json(path: str, data) -> None: with open(path, "w", encoding="utf-8") as f: json.dump(_to_native(data), f, ensure_ascii=False, indent=2) # ========= Master (koordynacja) ========= def main(): mp.set_start_method("spawn", force=True) out_q: mp.Queue = mp.Queue() stop_evt: mp.Event = mp.Event() # 1 instrument = 1 proces (daemon) procs: List[mp.Process] = [] for t in ALL_TICKERS: p = mp.Process(target=worker, args=(t, out_q, stop_evt), daemon=True) p.start() procs.append(p) portfolio = Portfolio(START_CAPITAL) last_dump = 0.0 signals_window: Dict[str, Signal] = {} try: while True: # zbieramy sygnały przez okno ~1 minuty deadline = time.time() + SCAN_EVERY_S while time.time() < deadline: try: s: Signal = out_q.get(timeout=0.5) signals_window[s.ticker] = s except Exception: pass if signals_window: sig_list = list(signals_window.values()) portfolio.on_signals(sig_list) now = time.time() if now - last_dump > SCAN_EVERY_S - 1: # --- JSON zapisy --- save_json("signals_scan.json", [asdict(s) for s in sig_list]) save_json("portfolio_history.json", portfolio.history) save_json("positions.json", [{"ticker": t, **p} for t, p in portfolio.positions.items()]) snapshot = { "time": time.strftime("%Y-%m-%d %H:%M:%S"), "last_history": portfolio.history[-1] if portfolio.history else None, "positions": [{"ticker": t, **p} for t, p in portfolio.positions.items()], "signals": [asdict(s) for s in sig_list], "capital_start": START_CAPITAL } save_json("snapshot.json", snapshot) last_dump = now if portfolio.history: print(pd.DataFrame(portfolio.history).tail(1).to_string(index=False)) else: print("Brak sygnałów w oknie – czekam...") except KeyboardInterrupt: print("\nStopping workers...") stop_evt.set() for p in procs: p.join(timeout=5) if __name__ == "__main__": main()