334 lines
14 KiB
Python
334 lines
14 KiB
Python
# portfolio.py
|
||
from __future__ import annotations
|
||
import math, time, json
|
||
from typing import Dict, List, Any, Optional
|
||
|
||
# ========= TRYB KSIĘGOWOŚCI =========
|
||
# "stock" – akcje (short dodaje gotówkę przy otwarciu, przy zamknięciu oddajesz notional)
|
||
# "margin" – FX/CFD (short NIE zmienia cash przy otwarciu; cash zmienia się o zrealizowany PnL przy zamknięciu)
|
||
ACCOUNTING_MODE = "margin"
|
||
|
||
# ========= RYZYKO / ATR =========
|
||
USE_ATR = True # jeśli sygnał ma "atr" > 0, to SL/TP liczone od ATR
|
||
SL_ATR_MULT = 2.0 # SL = 2.0 * ATR
|
||
TP_ATR_MULT = 3.0 # TP = 3.0 * ATR
|
||
RISK_FRACTION = 0.003 # 0.3% equity na trade (position sizing wg 1R)
|
||
|
||
# fallback procentowy, gdy brak ATR
|
||
SL_PCT = 0.010
|
||
TP_PCT = 0.020
|
||
TRAIL_PCT = 0.020 # spokojniejszy trailing
|
||
|
||
SYMBOL_OVERRIDES = {
|
||
# "EURUSD=X": {"SL_PCT": 0.0025, "TP_PCT": 0.0050, "TRAIL_PCT": 0.0030},
|
||
}
|
||
|
||
# ========= LIMITY BUDŻETOWE =========
|
||
ALLOC_FRACTION = 0.25 # max % gotówki na JEDEN LONG
|
||
MIN_TRADE_CASH = 25.0
|
||
MAX_NEW_POSITIONS_PER_CYCLE = 2 # None aby wyłączyć limit
|
||
|
||
# =================================
|
||
def _to_native(obj: Any):
|
||
"""JSON-safe: NaN/Inf -> None; rekurencyjnie czyści struktury."""
|
||
if isinstance(obj, float):
|
||
return None if (math.isnan(obj) or math.isinf(obj)) else obj
|
||
if isinstance(obj, (int, str)) or obj is None:
|
||
return obj
|
||
if isinstance(obj, dict):
|
||
return {k: _to_native(v) for k, v in obj.items()}
|
||
if isinstance(obj, (list, tuple)):
|
||
return [_to_native(x) for x in obj]
|
||
try:
|
||
return float(obj)
|
||
except Exception:
|
||
return str(obj)
|
||
|
||
def save_json(path: str, data: Any) -> None:
|
||
with open(path, "w", encoding="utf-8") as f:
|
||
json.dump(_to_native(data), f, ensure_ascii=False, indent=2, allow_nan=False)
|
||
|
||
class Portfolio:
|
||
"""
|
||
LONG open: cash -= qty*price ; close: cash += qty*price ; PnL = (px - entry)*qty
|
||
SHORT open (stock): cash += qty*price ; close: cash -= qty*price ; PnL = (entry - px)*qty
|
||
SHORT open (margin): cash bez zmian ; close: cash += PnL ; PnL = (entry - px)*qty
|
||
|
||
total_value:
|
||
- stock -> cash + positions_net (wartość likwidacyjna)
|
||
- margin -> start_capital + realized_pnl + unrealized_pnl
|
||
"""
|
||
def __init__(self, capital: float):
|
||
self.cash = float(capital)
|
||
self.start_capital = float(capital)
|
||
self.positions: Dict[str, Dict] = {}
|
||
self.history: List[Dict] = []
|
||
self.trade_log: List[Dict] = []
|
||
self.realized_pnl: float = 0.0
|
||
self.last_prices: Dict[str, float] = {} # ostatnie znane ceny
|
||
|
||
# ---------- helpers ----------
|
||
def _get_params(self, ticker: str):
|
||
o = SYMBOL_OVERRIDES.get(ticker, {})
|
||
sl = float(o.get("SL_PCT", SL_PCT))
|
||
tp = float(o.get("TP_PCT", TP_PCT))
|
||
tr = float(o.get("TRAIL_PCT", TRAIL_PCT))
|
||
return sl, tp, tr
|
||
|
||
def _init_risk(self, ticker: str, entry: float, side: int, atr: Optional[float] = None):
|
||
"""Zwraca: stop, take, trail_best, trail_stop, one_r."""
|
||
sl_pct, tp_pct, trail_pct = self._get_params(ticker)
|
||
use_atr = USE_ATR and atr is not None and isinstance(atr, (int, float)) and atr > 0.0
|
||
|
||
if use_atr:
|
||
sl_dist = SL_ATR_MULT * float(atr)
|
||
tp_dist = TP_ATR_MULT * float(atr)
|
||
stop = entry - sl_dist if side == 1 else entry + sl_dist
|
||
take = entry + tp_dist if side == 1 else entry - tp_dist
|
||
one_r = sl_dist
|
||
else:
|
||
if side == 1:
|
||
stop = entry * (1.0 - sl_pct); take = entry * (1.0 + tp_pct)
|
||
else:
|
||
stop = entry * (1.0 + sl_pct); take = entry * (1.0 - tp_pct)
|
||
one_r = abs(entry - stop)
|
||
|
||
trail_best = entry
|
||
trail_stop = stop
|
||
return stop, take, trail_best, trail_stop, max(one_r, 1e-8)
|
||
|
||
def _update_trailing(self, ticker: str, pos: Dict, price: float):
|
||
"""Breakeven po 1R + trailing procentowy."""
|
||
_, _, trail_pct = self._get_params(ticker)
|
||
|
||
# BE po 1R
|
||
if pos["side"] == 1 and not pos.get("be_done", False):
|
||
if price >= pos["entry"] + pos["one_r"]:
|
||
pos["stop"] = max(pos["stop"], pos["entry"])
|
||
pos["be_done"] = True
|
||
elif pos["side"] == -1 and not pos.get("be_done", False):
|
||
if price <= pos["entry"] - pos["one_r"]:
|
||
pos["stop"] = min(pos["stop"], pos["entry"])
|
||
pos["be_done"] = True
|
||
|
||
# trailing
|
||
if pos["side"] == 1:
|
||
if price > pos["trail_best"]:
|
||
pos["trail_best"] = price
|
||
pos["trail_stop"] = pos["trail_best"] * (1.0 - trail_pct)
|
||
else:
|
||
if price < pos["trail_best"]:
|
||
pos["trail_best"] = price
|
||
pos["trail_stop"] = pos["trail_best"] * (1.0 + trail_pct)
|
||
|
||
def _positions_values(self, prices: Optional[Dict[str, float]] = None) -> Dict[str, float]:
|
||
"""Zwraca Σ|qty*px| oraz Σqty*px*side. Fallback ceny: prices -> self.last_prices -> entry."""
|
||
gross = 0.0
|
||
net = 0.0
|
||
for t, p in self.positions.items():
|
||
px = None
|
||
if prices is not None:
|
||
px = prices.get(t)
|
||
if px is None or (isinstance(px, float) and math.isnan(px)):
|
||
px = self.last_prices.get(t)
|
||
if px is None or (isinstance(px, float) and math.isnan(px)):
|
||
px = p["entry"]
|
||
gross += abs(p["qty"] * px)
|
||
net += p["qty"] * px * p["side"]
|
||
return {"positions_gross": gross, "positions_net": net}
|
||
|
||
def unrealized_pnl(self, prices: Dict[str, float]) -> float:
|
||
"""Σ side * (px - entry) * qty."""
|
||
upnl = 0.0
|
||
for t, p in self.positions.items():
|
||
px = prices.get(t, self.last_prices.get(t, p["entry"]))
|
||
if px is None or (isinstance(px, float) and math.isnan(px)): px = p["entry"]
|
||
upnl += p["side"] * (px - p["entry"]) * p["qty"]
|
||
return upnl
|
||
|
||
def equity_from_start(self, prices: Dict[str, float]) -> float:
|
||
return self.start_capital + self.realized_pnl + self.unrealized_pnl(prices)
|
||
|
||
# ---------- zamknięcia / log ----------
|
||
def _close_position(self, ticker: str, price: float, reason: str):
|
||
pos = self.positions.get(ticker)
|
||
if not pos: return
|
||
qty, entry, side = pos["qty"], pos["entry"], pos["side"]
|
||
|
||
if side == 1:
|
||
self.cash += qty * price
|
||
pnl_abs = (price - entry) * qty
|
||
else:
|
||
pnl_abs = (entry - price) * qty
|
||
if ACCOUNTING_MODE == "stock":
|
||
self.cash -= qty * price
|
||
else:
|
||
self.cash += pnl_abs # margin: tylko PnL wpływa na cash
|
||
|
||
denom = max(qty * entry, 1e-12)
|
||
pnl_pct = pnl_abs / denom
|
||
self.realized_pnl += pnl_abs
|
||
|
||
self._log_trade("SELL", ticker, price, qty, side, pnl_abs, pnl_pct, reason=reason)
|
||
del self.positions[ticker]
|
||
|
||
def _log_trade(self, action: str, ticker: str, price: float, qty: float, side: int,
|
||
pnl_abs: float = 0.0, pnl_pct: float = 0.0, reason: Optional[str] = None):
|
||
ts = time.strftime("%Y-%m-%d %H:%M:%S")
|
||
if action == "BUY":
|
||
print(f"[{ts}] BUY {ticker} @ {price:.6f}, qty={qty:.6f}, side={'LONG' if side==1 else 'SHORT'}, cash={self.cash:.2f}")
|
||
else:
|
||
r = f", reason={reason}" if reason else ""
|
||
print(f"[{ts}] SELL {ticker} @ {price:.6f}, qty={qty:.6f}, PnL={pnl_abs:+.2f} ({pnl_pct:+.2%}), cash={self.cash:.2f}{r}")
|
||
self.trade_log.append({
|
||
"time": ts, "action": action, "ticker": ticker, "price": float(price),
|
||
"qty": float(qty), "side": int(side),
|
||
"pnl_abs": float(pnl_abs), "pnl_pct": float(pnl_pct),
|
||
"realized_pnl_cum": float(self.realized_pnl), "cash_after": float(self.cash),
|
||
"reason": reason or ""
|
||
})
|
||
|
||
# ---------- główna aktualizacja ----------
|
||
def on_signals(self, sigs: List[dict]):
|
||
# normalizacja
|
||
clean: List[Dict[str, Any]] = []
|
||
for s in sigs:
|
||
if isinstance(s, dict):
|
||
px = s.get("price")
|
||
if s.get("error") is None and px is not None and not math.isnan(px):
|
||
clean.append(s)
|
||
else:
|
||
if getattr(s, "error", None) is None and not math.isnan(getattr(s, "price", float("nan"))):
|
||
clean.append({
|
||
"ticker": s.ticker, "price": s.price, "signal": s.signal, "time": s.time,
|
||
"atr": getattr(s, "atr", None)
|
||
})
|
||
|
||
# snapshot, gdy brak świeżych danych
|
||
if not clean:
|
||
vals = self._positions_values(None)
|
||
prices_map = {t: self.last_prices.get(t, p["entry"]) for t, p in self.positions.items()}
|
||
if ACCOUNTING_MODE == "stock":
|
||
account_value = self.cash + vals["positions_net"]
|
||
else:
|
||
account_value = self.equity_from_start(prices_map)
|
||
unreal = self.unrealized_pnl(prices_map)
|
||
self.history.append({
|
||
"time": time.strftime("%Y-%m-%d %H:%M:%S"),
|
||
"cash": float(self.cash),
|
||
"positions_value": float(vals["positions_gross"]),
|
||
"positions_net": float(vals["positions_net"]),
|
||
"total_value": float(account_value),
|
||
"equity_from_start": float(self.start_capital + self.realized_pnl + unreal),
|
||
"unrealized_pnl": float(unreal),
|
||
"realized_pnl_cum": float(self.realized_pnl),
|
||
"open_positions": int(len(self.positions))
|
||
})
|
||
save_json("portfolio_history.json", self.history)
|
||
return
|
||
|
||
# mapy cen
|
||
prices = {s["ticker"]: float(s["price"]) for s in clean}
|
||
self.last_prices.update(prices)
|
||
|
||
# 1) risk exits (SL/TP/TRAIL + BE)
|
||
to_close = []
|
||
for t, pos in list(self.positions.items()):
|
||
price = prices.get(t)
|
||
if price is None or math.isnan(price): continue
|
||
self._update_trailing(t, pos, price)
|
||
if pos["side"] == 1:
|
||
if price <= pos["stop"]: to_close.append((t, "SL"))
|
||
elif price >= pos["take"]: to_close.append((t, "TP"))
|
||
elif price <= pos.get("trail_stop", -float("inf")): to_close.append((t, "TRAIL"))
|
||
else:
|
||
if price >= pos["stop"]: to_close.append((t, "SL"))
|
||
elif price <= pos["take"]: to_close.append((t, "TP"))
|
||
elif price >= pos.get("trail_stop", float("inf")): to_close.append((t, "TRAIL"))
|
||
for t, reason in to_close:
|
||
price = prices.get(t)
|
||
if price is not None and not math.isnan(price) and t in self.positions:
|
||
self._close_position(t, price, reason=reason)
|
||
|
||
# 2) zamknięcie na przeciwny/HOLD
|
||
for s in clean:
|
||
t, sig = s["ticker"], int(s["signal"])
|
||
price = float(s["price"])
|
||
if t in self.positions:
|
||
pos = self.positions[t]
|
||
if sig == 0 or sig != pos["side"]:
|
||
self._close_position(t, price, reason="SIGNAL")
|
||
|
||
# 3) otwarcia – ATR sizing + limity
|
||
candidates = [s for s in clean if s["ticker"] not in self.positions and int(s["signal"]) != 0]
|
||
if MAX_NEW_POSITIONS_PER_CYCLE and MAX_NEW_POSITIONS_PER_CYCLE > 0:
|
||
candidates = candidates[:MAX_NEW_POSITIONS_PER_CYCLE]
|
||
|
||
for s in candidates:
|
||
t, sig, price = s["ticker"], int(s["signal"]), float(s["price"])
|
||
atr = s.get("atr", None)
|
||
if price <= 0: continue
|
||
|
||
stop, take, trail_best, trail_stop, one_r = self._init_risk(t, price, sig, atr)
|
||
|
||
# equity teraz (dla sizingu ryzyka)
|
||
if ACCOUNTING_MODE == "margin":
|
||
equity_now = self.start_capital + self.realized_pnl + self.unrealized_pnl(self.last_prices)
|
||
else:
|
||
vals_tmp = self._positions_values(self.last_prices)
|
||
equity_now = self.cash + vals_tmp["positions_net"]
|
||
|
||
risk_amount = max(1e-6, equity_now * RISK_FRACTION)
|
||
qty_risk = risk_amount / max(one_r, 1e-8)
|
||
|
||
# limit gotówki dla LONG
|
||
per_trade_cash = max(MIN_TRADE_CASH, self.cash * ALLOC_FRACTION)
|
||
qty_cash = per_trade_cash / max(price, 1e-12) if sig == 1 else float("inf")
|
||
|
||
qty = max(0.0, min(qty_risk, qty_cash))
|
||
if qty <= 0: continue
|
||
|
||
# księgowanie gotówki przy otwarciu
|
||
if sig == 1:
|
||
cost = qty * price
|
||
if self.cash < cost: continue
|
||
self.cash -= cost
|
||
else:
|
||
if ACCOUNTING_MODE == "stock":
|
||
self.cash += qty * price
|
||
# margin: brak zmiany cash przy short open
|
||
|
||
self.positions[t] = {
|
||
"qty": qty, "entry": price, "side": sig,
|
||
"stop": stop, "take": take,
|
||
"trail_best": trail_best, "trail_stop": trail_stop,
|
||
"one_r": one_r, "be_done": False
|
||
}
|
||
self._log_trade("BUY" if sig == 1 else "SELL", t, price, qty, sig)
|
||
|
||
# 4) snapshot na bieżących/ostatnich cenach
|
||
vals = self._positions_values(self.last_prices)
|
||
prices_map = {t: self.last_prices.get(t, p["entry"]) for t, p in self.positions.items()}
|
||
if ACCOUNTING_MODE == "stock":
|
||
account_value = self.cash + vals["positions_net"]
|
||
else:
|
||
account_value = self.equity_from_start(prices_map)
|
||
unreal = self.unrealized_pnl(prices_map)
|
||
|
||
self.history.append({
|
||
"time": clean[0]["time"],
|
||
"cash": float(self.cash),
|
||
"positions_value": float(vals["positions_gross"]),
|
||
"positions_net": float(vals["positions_net"]),
|
||
"total_value": float(account_value),
|
||
"equity_from_start": float(self.start_capital + self.realized_pnl + unreal),
|
||
"unrealized_pnl": float(unreal),
|
||
"realized_pnl_cum": float(self.realized_pnl),
|
||
"open_positions": int(len(self.positions))
|
||
})
|
||
|
||
# 5) zapisy
|
||
save_json("trade_log.json", self.trade_log)
|
||
save_json("portfolio_history.json", self.history)
|
||
save_json("positions.json", [{"ticker": t, **p} for t, p in self.positions.items()])
|