Ezen az oldalon egy konkrét fájl aktuális állapotát tudod megnézni.
/opt/bots/saturnus/app/tick_runner.py# tick_runner.py
# Saturnus – tick loop + recovery state kezelés
import time
from typing import Dict, Any
from rule_engine import decide
# =========================================================
# State kezelés
# =========================================================
def ensure_recovery_fields(state: Dict[str, Any]):
cycle = state.setdefault("cycle", {})
cycle.setdefault("recovery_mode", False)
cycle.setdefault("recovery_loss_pct", 0.0)
cycle.setdefault("recovery_anchor_price", None)
def enter_recovery(state: Dict[str, Any], entry_price: float, exit_price: float):
cycle = state["cycle"]
loss_pct = max(0.0, (entry_price - exit_price) / entry_price)
cycle["recovery_mode"] = True
cycle["recovery_loss_pct"] = loss_pct
cycle["recovery_anchor_price"] = entry_price
print(f"[RECOVERY ENTER] loss={loss_pct:.5f} anchor={entry_price}")
def exit_recovery(state: Dict[str, Any]):
cycle = state["cycle"]
cycle["recovery_mode"] = False
cycle["recovery_loss_pct"] = 0.0
cycle["recovery_anchor_price"] = None
print("[RECOVERY EXIT]")
# =========================================================
# Trade végrehajtás (szimuláció / wrapper)
# =========================================================
def execute_trade(state: Dict[str, Any], decision: Dict[str, Any]):
action = decision.get("action")
last = state.get("last")
if action == "SELL":
entry_price = state.get("entry_price")
# veszteség vizsgálat
if entry_price is not None and last is not None and last < entry_price:
enter_recovery(state, entry_price, last)
else:
# profit esetén recovery OFF
exit_recovery(state)
state["in_position"] = False
state["entry_price"] = None
state["trough"] = last
elif action == "BUY":
state["in_position"] = True
state["entry_price"] = last
state["peak"] = last
# recovery módot NEM kapcsoljuk ki BUY-nál
# csak akkor lépünk ki recovery-ből, ha profitban zárunk SELL-nél
# =========================================================
# Tick loop
# =========================================================
def run_tick(state: Dict[str, Any]) -> Dict[str, Any]:
ensure_recovery_fields(state)
decision = decide(state)
if decision["action"] in ("BUY", "SELL"):
execute_trade(state, decision)
return decision
# =========================================================
# Példa futtatás (debug)
# =========================================================
if __name__ == "__main__":
state = {
"last": 100,
"prev_last": 100,
"peak": 100,
"trough": 100,
"in_position": False,
"cycle": {},
}
while True:
decision = run_tick(state)
print(decision)
time.sleep(1)