Fájl részletek

Ezen az oldalon egy konkrét fájl aktuális állapotát tudod megnézni.

Vissza a fájltérképhez Csak változott Stratégia-labor Monitor főoldal
Fájl útvonala
/opt/bots/saturnus/freqtrade/user_data/strategies/SaturnusExecutor.py
Létezik most?
IGEN
Aktuális státusz
UNCHANGED
Méret
5076
Módosítás ideje
1769444696.8151994
Korábbi baseline időpont
1769444696.8151994
SHA256 rövid

Előnézet (első 120 sor)

# pragma pylint: disable=missing-docstring, invalid-name, pointless-string-statement
from __future__ import annotations

import json
import os
from datetime import datetime, timezone
from typing import Any, Dict

from freqtrade.strategy import IStrategy
from pandas import DataFrame


class SaturnusExecutor(IStrategy):
    """
    EXECUTOR ONLY STRATEGY
    - Nem számol saját logikát, nem használ indikátorokat.
    - A Saturnus app által frissített state.json alapján ad belépési/kilépési jelet.
    - Stoploss védőháló: -10%.
    """

    # --- Freqtrade kötelező / alap paraméterek ---
    timeframe = "1m"
    can_short = False

    # Védőháló (freqtrade stoploss) - ha a 6 szabályos rendszer/infra összeomlik
    stoploss = -0.10

    # Minimal ROI: ne legyen ROI miatti automata zárás
    minimal_roi = {"0": 10}

    # Trailing kikapcsolva (a logika a Saturnus app-ban van)
    trailing_stop = False

    # Csak új gyertyán értékeljen (stabilabb, determinisztikusabb)
    process_only_new_candles = True
    startup_candle_count = 10

    # --- Saturnus állományok helye ---
    # Kanonikus state.json az app oldalon
    STATE_JSON_PATH = "/opt/bots/saturnus/app/state.json"

    # Executor saját “last processed” nyilvántartása (hogy ne ismételje a jeleket)
    EXECUTOR_STATE_PATH = "/opt/bots/saturnus/freqtrade/user_data/.saturnus_executor_state.json"

    def informative_pairs(self):
        return []

    def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
        # Nincs indikátor.
        return dataframe

    # ----------------- helpers -----------------

    def _load_json(self, path: str) -> Dict[str, Any]:
        try:
            with open(path, "r", encoding="utf-8") as f:
                return json.load(f)
        except FileNotFoundError:
            return {}
        except Exception:
            return {}

    def _save_json(self, path: str, data: Dict[str, Any]) -> None:
        tmp = path + ".tmp"
        os.makedirs(os.path.dirname(path), exist_ok=True)
        with open(tmp, "w", encoding="utf-8") as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        os.replace(tmp, path)

    def _now_iso(self) -> str:
        return datetime.now(timezone.utc).isoformat()

    def _get_pair_signal(self, state: Dict[str, Any], pair: str) -> Dict[str, Any]:
        """
        Elvárt jel formák (rugalmas, de determinisztikus):
        A) state["signals"][PAIR] = {"action":"BUY|SELL|NONE", "id":"...", "ts":"..."}
        B) state["signal"] = {"pair":"...", "action":"BUY|SELL|NONE", "id":"...", "ts":"..."}
        Ha nincs jel: NONE.
        """
        sig = {}

        signals = state.get("signals")
        if isinstance(signals, dict) and isinstance(signals.get(pair), dict):
            sig = signals.get(pair, {}) or {}

        if not sig:
            root = state.get("signal")
            if isinstance(root, dict) and root.get("pair") == pair:
                sig = root

        action = (sig.get("action") or "NONE").upper()
        sig_id = str(sig.get("id") or "")
        sig_ts = str(sig.get("ts") or "")

        return {"action": action, "id": sig_id, "ts": sig_ts}

    def _already_processed(self, pair: str, sig_id: str, action: str) -> bool:
        if not sig_id:
            return False
        ex = self._load_json(self.EXECUTOR_STATE_PATH)
        last = ex.get(pair, {})
        return (
            isinstance(last, dict)
            and last.get("id") == sig_id
            and last.get("action") == action
        )

    def _mark_processed(self, pair: str, sig_id: str, action: str) -> None:
        if not sig_id:
            return
        ex = self._load_json(self.EXECUTOR_STATE_PATH)
        if not isinstance(ex, dict):
            ex = {}
        ex[pair] = {"id": sig_id, "action": action, "processed_at": self._now_iso()}
        self._save_json(self.EXECUTOR_STATE_PATH, ex)

    # ----------------- signals -----------------

    def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
        pair = metadata.get("pair", "")

Csak változott diff sorok

Teljes diff

[INFO] Nincs tartalmi eltérés a baseline és az aktuális fájl között.