Fájl részletek

Ezen az oldalon egy konkrét fájl aktuális állapotát tudod megnézni.

Vissza a fájltérképhez Csak változott Stratégia-labor Monitor főoldal
Fájl útvonala
/opt/bots/saturnus/app/tick_runner.py
Létezik most?
IGEN
Aktuális státusz
UNCHANGED
Méret
2908
Módosítás ideje
1776011347.683918
Korábbi baseline időpont
1776011347.683918
SHA256 rövid

Előnézet (első 120 sor)

# tick_runner.py
# Saturnus – tick loop + recovery state kezelés

import time
from typing import Dict, Any

from rule_engine import decide


# =========================================================
# State kezelés
# =========================================================

def ensure_recovery_fields(state: Dict[str, Any]):
    cycle = state.setdefault("cycle", {})

    cycle.setdefault("recovery_mode", False)
    cycle.setdefault("recovery_loss_pct", 0.0)
    cycle.setdefault("recovery_anchor_price", None)


def enter_recovery(state: Dict[str, Any], entry_price: float, exit_price: float):
    cycle = state["cycle"]

    loss_pct = max(0.0, (entry_price - exit_price) / entry_price)

    cycle["recovery_mode"] = True
    cycle["recovery_loss_pct"] = loss_pct
    cycle["recovery_anchor_price"] = entry_price

    print(f"[RECOVERY ENTER] loss={loss_pct:.5f} anchor={entry_price}")


def exit_recovery(state: Dict[str, Any]):
    cycle = state["cycle"]

    cycle["recovery_mode"] = False
    cycle["recovery_loss_pct"] = 0.0
    cycle["recovery_anchor_price"] = None

    print("[RECOVERY EXIT]")


# =========================================================
# Trade végrehajtás (szimuláció / wrapper)
# =========================================================

def execute_trade(state: Dict[str, Any], decision: Dict[str, Any]):
    action = decision.get("action")
    last = state.get("last")

    if action == "SELL":
        entry_price = state.get("entry_price")

        # veszteség vizsgálat
        if entry_price is not None and last is not None and last < entry_price:
            enter_recovery(state, entry_price, last)
        else:
            # profit esetén recovery OFF
            exit_recovery(state)

        state["in_position"] = False
        state["entry_price"] = None
        state["trough"] = last

    elif action == "BUY":
        state["in_position"] = True
        state["entry_price"] = last
        state["peak"] = last

        # recovery módot NEM kapcsoljuk ki BUY-nál
        # csak akkor lépünk ki recovery-ből, ha profitban zárunk SELL-nél


# =========================================================
# Tick loop
# =========================================================

def run_tick(state: Dict[str, Any]) -> Dict[str, Any]:
    ensure_recovery_fields(state)

    decision = decide(state)

    if decision["action"] in ("BUY", "SELL"):
        execute_trade(state, decision)

    return decision


# =========================================================
# Példa futtatás (debug)
# =========================================================

if __name__ == "__main__":
    state = {
        "last": 100,
        "prev_last": 100,
        "peak": 100,
        "trough": 100,
        "in_position": False,
        "cycle": {},
    }

    while True:
        decision = run_tick(state)
        print(decision)
        time.sleep(1)

Csak változott diff sorok

Teljes diff

[INFO] Nincs tartalmi eltérés a baseline és az aktuális fájl között.