Fájl részletek

Ezen az oldalon egy konkrét fájl aktuális állapotát tudod megnézni.

Vissza a fájltérképhez Csak változott Stratégia-labor Monitor főoldal
Fájl útvonala
/opt/bots/saturnus/app/BAK_2026-04-04_recovery_ma_final/tick_context.py
Létezik most?
IGEN
Aktuális státusz
UNCHANGED
Méret
10503
Módosítás ideje
1775305857.700749
Korábbi baseline időpont
1775305857.700749
SHA256 rövid

Előnézet (első 120 sor)

# tick_context.py
# Saturnus – TickContext
# Feladata: state + ár alapján egységes ctx előállítása a RuleEngine-nek

from typing import Dict, Any, Optional
from datetime import datetime, timezone
import json
from pathlib import Path

ROOT_DIR = Path("/opt/bots/saturnus")
RULES_PATH = ROOT_DIR / "config" / "trading_rules.json"


def now_iso() -> str:
    return datetime.now(timezone.utc).isoformat()


def _to_float(value: Any, default: float) -> float:
    try:
        return float(value)
    except Exception:
        return float(default)


def _safe_float(value: Any) -> Optional[float]:
    try:
        if value is None:
            return None
        return float(value)
    except Exception:
        return None


def _load_trading_rules() -> dict:
    defaults = {
        "std_sell_pct": 1.0,
        "profitless_sell_pct": 0.2,
        "profitless_sell_min_pct": 0.2,
        "panic_sell_pct": 1.0,
        "std_buy_pct": 1.0,
        "profitless_buy_pct": 0.2,
        "profitless_buy_max_pct": 0.2,
        "panic_buy_pct": 1.0,
    }

    try:
        with open(RULES_PATH, "r", encoding="utf-8") as f:
            raw = json.load(f)

        if not isinstance(raw, dict):
            return defaults

        merged = defaults.copy()
        merged.update(raw)

        for key in list(defaults.keys()):
            merged[key] = _to_float(merged.get(key), defaults[key])

        return merged

    except Exception:
        return defaults


def _clamp(value: float, low: float, high: float) -> float:
    if value < low:
        return low
    if value > high:
        return high
    return value


class TickContextBuilder:
    """
    NEM dönt.
    Csak adatot készít elő a RuleEngine számára.
    """

    @staticmethod
    def build(
        *,
        pair: str,
        state: Dict[str, Any],
        last_price: float,
        in_position: bool
    ) -> Dict[str, Any]:

        rules = _load_trading_rules()

        std_sell_pct = rules["std_sell_pct"] / 100.0
        profitless_sell_pct = rules["profitless_sell_pct"] / 100.0
        profitless_sell_min_pct = rules["profitless_sell_min_pct"] / 100.0
        panic_sell_pct = rules["panic_sell_pct"] / 100.0

        std_buy_pct = rules["std_buy_pct"] / 100.0
        profitless_buy_pct = rules["profitless_buy_pct"] / 100.0
        profitless_buy_max_pct = rules["profitless_buy_max_pct"] / 100.0
        panic_buy_pct = rules["panic_buy_pct"] / 100.0

        pairs_state = state.setdefault("pairs", {})
        pstate = pairs_state.setdefault(pair, {})

        prev_last = _safe_float(pstate.get("last"))
        prev_in_position = bool(pstate.get("in_position", False))

        base = _safe_float(pstate.get("base"))
        peak = _safe_float(pstate.get("peak"))
        trough = _safe_float(pstate.get("trough"))

        if base is None:
            base = float(last_price)

        if in_position:
            if not prev_in_position:
                peak = float(last_price)
                if trough is None:
                    trough = float(last_price)

            if peak is None or float(last_price) > peak:
                peak = float(last_price)

Csak változott diff sorok

Teljes diff

[INFO] Nincs tartalmi eltérés a baseline és az aktuális fájl között.