stock/portfolio.py
2025-08-15 12:32:27 +02:00

334 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# portfolio.py
from __future__ import annotations
import math, time, json
from typing import Dict, List, Any, Optional
# ========= TRYB KSIĘGOWOŚCI =========
# "stock" akcje (short dodaje gotówkę przy otwarciu, przy zamknięciu oddajesz notional)
# "margin" FX/CFD (short NIE zmienia cash przy otwarciu; cash zmienia się o zrealizowany PnL przy zamknięciu)
ACCOUNTING_MODE = "margin"
# ========= RYZYKO / ATR =========
USE_ATR = True # jeśli sygnał ma "atr" > 0, to SL/TP liczone od ATR
SL_ATR_MULT = 2.0 # SL = 2.0 * ATR
TP_ATR_MULT = 3.0 # TP = 3.0 * ATR
RISK_FRACTION = 0.003 # 0.3% equity na trade (position sizing wg 1R)
# fallback procentowy, gdy brak ATR
SL_PCT = 0.010
TP_PCT = 0.020
TRAIL_PCT = 0.020 # spokojniejszy trailing
SYMBOL_OVERRIDES = {
# "EURUSD=X": {"SL_PCT": 0.0025, "TP_PCT": 0.0050, "TRAIL_PCT": 0.0030},
}
# ========= LIMITY BUDŻETOWE =========
ALLOC_FRACTION = 0.25 # max % gotówki na JEDEN LONG
MIN_TRADE_CASH = 25.0
MAX_NEW_POSITIONS_PER_CYCLE = 2 # None aby wyłączyć limit
# =================================
def _to_native(obj: Any):
"""JSON-safe: NaN/Inf -> None; rekurencyjnie czyści struktury."""
if isinstance(obj, float):
return None if (math.isnan(obj) or math.isinf(obj)) else obj
if isinstance(obj, (int, str)) or obj is None:
return obj
if isinstance(obj, dict):
return {k: _to_native(v) for k, v in obj.items()}
if isinstance(obj, (list, tuple)):
return [_to_native(x) for x in obj]
try:
return float(obj)
except Exception:
return str(obj)
def save_json(path: str, data: Any) -> None:
with open(path, "w", encoding="utf-8") as f:
json.dump(_to_native(data), f, ensure_ascii=False, indent=2, allow_nan=False)
class Portfolio:
"""
LONG open: cash -= qty*price ; close: cash += qty*price ; PnL = (px - entry)*qty
SHORT open (stock): cash += qty*price ; close: cash -= qty*price ; PnL = (entry - px)*qty
SHORT open (margin): cash bez zmian ; close: cash += PnL ; PnL = (entry - px)*qty
total_value:
- stock -> cash + positions_net (wartość likwidacyjna)
- margin -> start_capital + realized_pnl + unrealized_pnl
"""
def __init__(self, capital: float):
self.cash = float(capital)
self.start_capital = float(capital)
self.positions: Dict[str, Dict] = {}
self.history: List[Dict] = []
self.trade_log: List[Dict] = []
self.realized_pnl: float = 0.0
self.last_prices: Dict[str, float] = {} # ostatnie znane ceny
# ---------- helpers ----------
def _get_params(self, ticker: str):
o = SYMBOL_OVERRIDES.get(ticker, {})
sl = float(o.get("SL_PCT", SL_PCT))
tp = float(o.get("TP_PCT", TP_PCT))
tr = float(o.get("TRAIL_PCT", TRAIL_PCT))
return sl, tp, tr
def _init_risk(self, ticker: str, entry: float, side: int, atr: Optional[float] = None):
"""Zwraca: stop, take, trail_best, trail_stop, one_r."""
sl_pct, tp_pct, trail_pct = self._get_params(ticker)
use_atr = USE_ATR and atr is not None and isinstance(atr, (int, float)) and atr > 0.0
if use_atr:
sl_dist = SL_ATR_MULT * float(atr)
tp_dist = TP_ATR_MULT * float(atr)
stop = entry - sl_dist if side == 1 else entry + sl_dist
take = entry + tp_dist if side == 1 else entry - tp_dist
one_r = sl_dist
else:
if side == 1:
stop = entry * (1.0 - sl_pct); take = entry * (1.0 + tp_pct)
else:
stop = entry * (1.0 + sl_pct); take = entry * (1.0 - tp_pct)
one_r = abs(entry - stop)
trail_best = entry
trail_stop = stop
return stop, take, trail_best, trail_stop, max(one_r, 1e-8)
def _update_trailing(self, ticker: str, pos: Dict, price: float):
"""Breakeven po 1R + trailing procentowy."""
_, _, trail_pct = self._get_params(ticker)
# BE po 1R
if pos["side"] == 1 and not pos.get("be_done", False):
if price >= pos["entry"] + pos["one_r"]:
pos["stop"] = max(pos["stop"], pos["entry"])
pos["be_done"] = True
elif pos["side"] == -1 and not pos.get("be_done", False):
if price <= pos["entry"] - pos["one_r"]:
pos["stop"] = min(pos["stop"], pos["entry"])
pos["be_done"] = True
# trailing
if pos["side"] == 1:
if price > pos["trail_best"]:
pos["trail_best"] = price
pos["trail_stop"] = pos["trail_best"] * (1.0 - trail_pct)
else:
if price < pos["trail_best"]:
pos["trail_best"] = price
pos["trail_stop"] = pos["trail_best"] * (1.0 + trail_pct)
def _positions_values(self, prices: Optional[Dict[str, float]] = None) -> Dict[str, float]:
"""Zwraca Σ|qty*px| oraz Σqty*px*side. Fallback ceny: prices -> self.last_prices -> entry."""
gross = 0.0
net = 0.0
for t, p in self.positions.items():
px = None
if prices is not None:
px = prices.get(t)
if px is None or (isinstance(px, float) and math.isnan(px)):
px = self.last_prices.get(t)
if px is None or (isinstance(px, float) and math.isnan(px)):
px = p["entry"]
gross += abs(p["qty"] * px)
net += p["qty"] * px * p["side"]
return {"positions_gross": gross, "positions_net": net}
def unrealized_pnl(self, prices: Dict[str, float]) -> float:
"""Σ side * (px - entry) * qty."""
upnl = 0.0
for t, p in self.positions.items():
px = prices.get(t, self.last_prices.get(t, p["entry"]))
if px is None or (isinstance(px, float) and math.isnan(px)): px = p["entry"]
upnl += p["side"] * (px - p["entry"]) * p["qty"]
return upnl
def equity_from_start(self, prices: Dict[str, float]) -> float:
return self.start_capital + self.realized_pnl + self.unrealized_pnl(prices)
# ---------- zamknięcia / log ----------
def _close_position(self, ticker: str, price: float, reason: str):
pos = self.positions.get(ticker)
if not pos: return
qty, entry, side = pos["qty"], pos["entry"], pos["side"]
if side == 1:
self.cash += qty * price
pnl_abs = (price - entry) * qty
else:
pnl_abs = (entry - price) * qty
if ACCOUNTING_MODE == "stock":
self.cash -= qty * price
else:
self.cash += pnl_abs # margin: tylko PnL wpływa na cash
denom = max(qty * entry, 1e-12)
pnl_pct = pnl_abs / denom
self.realized_pnl += pnl_abs
self._log_trade("SELL", ticker, price, qty, side, pnl_abs, pnl_pct, reason=reason)
del self.positions[ticker]
def _log_trade(self, action: str, ticker: str, price: float, qty: float, side: int,
pnl_abs: float = 0.0, pnl_pct: float = 0.0, reason: Optional[str] = None):
ts = time.strftime("%Y-%m-%d %H:%M:%S")
if action == "BUY":
print(f"[{ts}] BUY {ticker} @ {price:.6f}, qty={qty:.6f}, side={'LONG' if side==1 else 'SHORT'}, cash={self.cash:.2f}")
else:
r = f", reason={reason}" if reason else ""
print(f"[{ts}] SELL {ticker} @ {price:.6f}, qty={qty:.6f}, PnL={pnl_abs:+.2f} ({pnl_pct:+.2%}), cash={self.cash:.2f}{r}")
self.trade_log.append({
"time": ts, "action": action, "ticker": ticker, "price": float(price),
"qty": float(qty), "side": int(side),
"pnl_abs": float(pnl_abs), "pnl_pct": float(pnl_pct),
"realized_pnl_cum": float(self.realized_pnl), "cash_after": float(self.cash),
"reason": reason or ""
})
# ---------- główna aktualizacja ----------
def on_signals(self, sigs: List[dict]):
# normalizacja
clean: List[Dict[str, Any]] = []
for s in sigs:
if isinstance(s, dict):
px = s.get("price")
if s.get("error") is None and px is not None and not math.isnan(px):
clean.append(s)
else:
if getattr(s, "error", None) is None and not math.isnan(getattr(s, "price", float("nan"))):
clean.append({
"ticker": s.ticker, "price": s.price, "signal": s.signal, "time": s.time,
"atr": getattr(s, "atr", None)
})
# snapshot, gdy brak świeżych danych
if not clean:
vals = self._positions_values(None)
prices_map = {t: self.last_prices.get(t, p["entry"]) for t, p in self.positions.items()}
if ACCOUNTING_MODE == "stock":
account_value = self.cash + vals["positions_net"]
else:
account_value = self.equity_from_start(prices_map)
unreal = self.unrealized_pnl(prices_map)
self.history.append({
"time": time.strftime("%Y-%m-%d %H:%M:%S"),
"cash": float(self.cash),
"positions_value": float(vals["positions_gross"]),
"positions_net": float(vals["positions_net"]),
"total_value": float(account_value),
"equity_from_start": float(self.start_capital + self.realized_pnl + unreal),
"unrealized_pnl": float(unreal),
"realized_pnl_cum": float(self.realized_pnl),
"open_positions": int(len(self.positions))
})
save_json("portfolio_history.json", self.history)
return
# mapy cen
prices = {s["ticker"]: float(s["price"]) for s in clean}
self.last_prices.update(prices)
# 1) risk exits (SL/TP/TRAIL + BE)
to_close = []
for t, pos in list(self.positions.items()):
price = prices.get(t)
if price is None or math.isnan(price): continue
self._update_trailing(t, pos, price)
if pos["side"] == 1:
if price <= pos["stop"]: to_close.append((t, "SL"))
elif price >= pos["take"]: to_close.append((t, "TP"))
elif price <= pos.get("trail_stop", -float("inf")): to_close.append((t, "TRAIL"))
else:
if price >= pos["stop"]: to_close.append((t, "SL"))
elif price <= pos["take"]: to_close.append((t, "TP"))
elif price >= pos.get("trail_stop", float("inf")): to_close.append((t, "TRAIL"))
for t, reason in to_close:
price = prices.get(t)
if price is not None and not math.isnan(price) and t in self.positions:
self._close_position(t, price, reason=reason)
# 2) zamknięcie na przeciwny/HOLD
for s in clean:
t, sig = s["ticker"], int(s["signal"])
price = float(s["price"])
if t in self.positions:
pos = self.positions[t]
if sig == 0 or sig != pos["side"]:
self._close_position(t, price, reason="SIGNAL")
# 3) otwarcia ATR sizing + limity
candidates = [s for s in clean if s["ticker"] not in self.positions and int(s["signal"]) != 0]
if MAX_NEW_POSITIONS_PER_CYCLE and MAX_NEW_POSITIONS_PER_CYCLE > 0:
candidates = candidates[:MAX_NEW_POSITIONS_PER_CYCLE]
for s in candidates:
t, sig, price = s["ticker"], int(s["signal"]), float(s["price"])
atr = s.get("atr", None)
if price <= 0: continue
stop, take, trail_best, trail_stop, one_r = self._init_risk(t, price, sig, atr)
# equity teraz (dla sizingu ryzyka)
if ACCOUNTING_MODE == "margin":
equity_now = self.start_capital + self.realized_pnl + self.unrealized_pnl(self.last_prices)
else:
vals_tmp = self._positions_values(self.last_prices)
equity_now = self.cash + vals_tmp["positions_net"]
risk_amount = max(1e-6, equity_now * RISK_FRACTION)
qty_risk = risk_amount / max(one_r, 1e-8)
# limit gotówki dla LONG
per_trade_cash = max(MIN_TRADE_CASH, self.cash * ALLOC_FRACTION)
qty_cash = per_trade_cash / max(price, 1e-12) if sig == 1 else float("inf")
qty = max(0.0, min(qty_risk, qty_cash))
if qty <= 0: continue
# księgowanie gotówki przy otwarciu
if sig == 1:
cost = qty * price
if self.cash < cost: continue
self.cash -= cost
else:
if ACCOUNTING_MODE == "stock":
self.cash += qty * price
# margin: brak zmiany cash przy short open
self.positions[t] = {
"qty": qty, "entry": price, "side": sig,
"stop": stop, "take": take,
"trail_best": trail_best, "trail_stop": trail_stop,
"one_r": one_r, "be_done": False
}
self._log_trade("BUY" if sig == 1 else "SELL", t, price, qty, sig)
# 4) snapshot na bieżących/ostatnich cenach
vals = self._positions_values(self.last_prices)
prices_map = {t: self.last_prices.get(t, p["entry"]) for t, p in self.positions.items()}
if ACCOUNTING_MODE == "stock":
account_value = self.cash + vals["positions_net"]
else:
account_value = self.equity_from_start(prices_map)
unreal = self.unrealized_pnl(prices_map)
self.history.append({
"time": clean[0]["time"],
"cash": float(self.cash),
"positions_value": float(vals["positions_gross"]),
"positions_net": float(vals["positions_net"]),
"total_value": float(account_value),
"equity_from_start": float(self.start_capital + self.realized_pnl + unreal),
"unrealized_pnl": float(unreal),
"realized_pnl_cum": float(self.realized_pnl),
"open_positions": int(len(self.positions))
})
# 5) zapisy
save_json("trade_log.json", self.trade_log)
save_json("portfolio_history.json", self.history)
save_json("positions.json", [{"ticker": t, **p} for t, p in self.positions.items()])