158 lines
5.7 KiB
Python
158 lines
5.7 KiB
Python
# trading_bot_workers.py
|
||
# pip install yfinance pandas numpy
|
||
|
||
from __future__ import annotations
|
||
import time, warnings, random, multiprocessing as mp, json
|
||
from dataclasses import dataclass, asdict
|
||
from typing import Dict, List, Tuple, Optional
|
||
import pandas as pd
|
||
import yfinance as yf
|
||
|
||
from indicators import add_indicators
|
||
from strategies import get_signal
|
||
from portfolio import Portfolio
|
||
|
||
warnings.filterwarnings("ignore", category=FutureWarning)
|
||
|
||
# ========= KONFIG =========
|
||
START_CAPITAL = 10_000.0
|
||
MIN_BARS = 200
|
||
SCAN_EVERY_S = 60
|
||
YF_JITTER_S = (0.05, 0.25)
|
||
PRIMARY = ("7d","1m") # preferowane: 1m/7d
|
||
FALLBACKS = [("60d","5m"), ("60d","15m")] # fallbacki gdy 1m brak
|
||
|
||
FOREX_20 = [
|
||
"EURUSD=X","USDJPY=X","GBPUSD=X","USDCHF=X","AUDUSD=X",
|
||
"USDCAD=X","NZDUSD=X","EURJPY=X","EURGBP=X","EURCHF=X",
|
||
"GBPJPY=X","CHFJPY=X","AUDJPY=X","AUDNZD=X","EURAUD=X",
|
||
"EURCAD=X","GBPCAD=X","CADJPY=X","NZDJPY=X","GC=F"
|
||
]
|
||
CRYPTO_20 = [
|
||
"BTC-USD","ETH-USD","BNB-USD","SOL-USD","XRP-USD","ADA-USD","DOGE-USD","TRX-USD","TON-USD","DOT-USD",
|
||
"LTC-USD","BCH-USD","ATOM-USD","LINK-USD","XLM-USD","ETC-USD","NEAR-USD","OP-USD"
|
||
]
|
||
ALL_TICKERS = FOREX_20 + CRYPTO_20
|
||
|
||
# ========= Pobieranie z fallbackiem =========
|
||
def yf_download_with_fallback(ticker: str) -> Tuple[pd.DataFrame, str, str]:
|
||
time.sleep(random.uniform(*YF_JITTER_S))
|
||
for period, interval in (PRIMARY, *FALLBACKS):
|
||
df = yf.download(ticker, period=period, interval=interval, auto_adjust=False, progress=False, threads=False)
|
||
if df is not None and not df.empty:
|
||
df = df.rename(columns=str.lower).dropna()
|
||
if not isinstance(df.index, pd.DatetimeIndex):
|
||
df.index = pd.to_datetime(df.index)
|
||
return df, period, interval
|
||
raise ValueError("No data in primary/fallback intervals")
|
||
|
||
# ========= Worker: jeden instrument = jeden proces =========
|
||
@dataclass
|
||
class Signal:
|
||
ticker: str
|
||
time: str
|
||
price: float
|
||
signal: int
|
||
period: str
|
||
interval: str
|
||
error: Optional[str] = None
|
||
|
||
def worker(ticker: str, out_q: mp.Queue, stop_evt: mp.Event, min_bars: int = MIN_BARS):
|
||
while not stop_evt.is_set():
|
||
try:
|
||
df, used_period, used_interval = yf_download_with_fallback(ticker)
|
||
if len(df) < min_bars:
|
||
raise ValueError(f"Too few bars: {len(df)}<{min_bars} at {used_interval}")
|
||
df = add_indicators(df, min_bars=min_bars)
|
||
sig = get_signal(df)
|
||
price = float(df["close"].iloc[-1])
|
||
ts = str(df.index[-1])
|
||
out_q.put(Signal(ticker, ts, price, sig, used_period, used_interval))
|
||
except Exception as e:
|
||
out_q.put(Signal(ticker, time.strftime("%Y-%m-%d %H:%M:%S"), float("nan"), 0, "NA", "NA", error=str(e)))
|
||
time.sleep(SCAN_EVERY_S)
|
||
|
||
# ========= Pomocnicze: serializacja do JSON =========
|
||
def _to_native(obj):
|
||
"""Bezpieczny rzut na prymitywy JSON (float/int/str/bool/None)."""
|
||
if isinstance(obj, (float, int, str)) or obj is None:
|
||
return obj
|
||
if isinstance(obj, dict):
|
||
return {k: _to_native(v) for k, v in obj.items()}
|
||
if isinstance(obj, (list, tuple)):
|
||
return [_to_native(x) for x in obj]
|
||
if hasattr(obj, "__dataclass_fields__"): # dataclass
|
||
return _to_native(asdict(obj))
|
||
try:
|
||
return float(obj)
|
||
except Exception:
|
||
return str(obj)
|
||
|
||
def save_json(path: str, data) -> None:
|
||
with open(path, "w", encoding="utf-8") as f:
|
||
json.dump(_to_native(data), f, ensure_ascii=False, indent=2)
|
||
|
||
# ========= Master (koordynacja) =========
|
||
def main():
|
||
mp.set_start_method("spawn", force=True)
|
||
out_q: mp.Queue = mp.Queue()
|
||
stop_evt: mp.Event = mp.Event()
|
||
|
||
# 1 instrument = 1 proces (daemon)
|
||
procs: List[mp.Process] = []
|
||
for t in ALL_TICKERS:
|
||
p = mp.Process(target=worker, args=(t, out_q, stop_evt), daemon=True)
|
||
p.start()
|
||
procs.append(p)
|
||
|
||
portfolio = Portfolio(START_CAPITAL)
|
||
last_dump = 0.0
|
||
signals_window: Dict[str, Signal] = {}
|
||
|
||
try:
|
||
while True:
|
||
# zbieramy sygnały przez okno ~1 minuty
|
||
deadline = time.time() + SCAN_EVERY_S
|
||
while time.time() < deadline:
|
||
try:
|
||
s: Signal = out_q.get(timeout=0.5)
|
||
signals_window[s.ticker] = s
|
||
except Exception:
|
||
pass
|
||
|
||
if signals_window:
|
||
sig_list = list(signals_window.values())
|
||
portfolio.on_signals(sig_list)
|
||
|
||
now = time.time()
|
||
if now - last_dump > SCAN_EVERY_S - 1:
|
||
# --- JSON zapisy ---
|
||
save_json("signals_scan.json", [asdict(s) for s in sig_list])
|
||
save_json("portfolio_history.json", portfolio.history)
|
||
save_json("positions.json", [{"ticker": t, **p} for t, p in portfolio.positions.items()])
|
||
|
||
snapshot = {
|
||
"time": time.strftime("%Y-%m-%d %H:%M:%S"),
|
||
"last_history": portfolio.history[-1] if portfolio.history else None,
|
||
"positions": [{"ticker": t, **p} for t, p in portfolio.positions.items()],
|
||
"signals": [asdict(s) for s in sig_list],
|
||
"capital_start": START_CAPITAL
|
||
}
|
||
save_json("snapshot.json", snapshot)
|
||
|
||
last_dump = now
|
||
|
||
if portfolio.history:
|
||
print(pd.DataFrame(portfolio.history).tail(1).to_string(index=False))
|
||
else:
|
||
print("Brak sygnałów w oknie – czekam...")
|
||
|
||
except KeyboardInterrupt:
|
||
print("\nStopping workers...")
|
||
stop_evt.set()
|
||
for p in procs:
|
||
p.join(timeout=5)
|
||
|
||
if __name__ == "__main__":
|
||
main()
|