Fájl részletek

Ezen az oldalon egy konkrét fájl aktuális állapotát tudod megnézni.

Vissza a fájltérképhez Csak változott Stratégia-labor Monitor főoldal
Fájl útvonala
/opt/bots/saturnus/app/tools/executor_dryrun_validation.py
Létezik most?
IGEN
Aktuális státusz
UNCHANGED
Méret
6454
Módosítás ideje
1772392861.704674
Korábbi baseline időpont
1772392861.704674
SHA256 rövid

Előnézet (első 120 sor)

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
executor_dryrun_validation.py (CANONICAL TEST TOOL)

Purpose:
  Validate that Saturnus tick_runner executor wiring is SAFE in dry-run:
    A) EXECUTION_ENABLED=0 (disabled) => must NOT touch network, executed=False
    B) EXECUTION_ENABLED=1 + EXECUTION_LOG_ONLY=1 (log-only) => must NOT touch network
       (If any network attempt happens => FAIL)

This tool monkeypatches common HTTP clients (requests/httpx/urllib) to raise immediately.
If the executor path tries to call Freqtrade REST endpoints, the test will FAIL.
"""

import os
import sys
import importlib
from types import ModuleType
from typing import Any, Dict, Tuple


# --- Ensure /opt/bots/saturnus/app is importable (so "import tick_runner" works) ---
APP_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if APP_DIR not in sys.path:
    sys.path.insert(0, APP_DIR)


class NetworkAttempt(Exception):
    pass


def _patch_requests() -> int:
    applied = 0
    try:
        import requests  # type: ignore

        def _blocked(*args, **kwargs):
            raise NetworkAttempt(f"NETWORK_BLOCKED: requests call attempted args={args} kwargs={kwargs}")

        if hasattr(requests, "request"):
            requests.request = _blocked  # type: ignore
            applied += 1

        try:
            import requests.sessions  # type: ignore

            if hasattr(requests.sessions, "Session") and hasattr(requests.sessions.Session, "request"):
                requests.sessions.Session.request = _blocked  # type: ignore
                applied += 1
        except Exception:
            pass
    except Exception:
        pass

    return applied


def _patch_httpx() -> int:
    applied = 0
    try:
        import httpx  # type: ignore

        def _blocked(*args, **kwargs):
            raise NetworkAttempt(f"NETWORK_BLOCKED: httpx call attempted args={args} kwargs={kwargs}")

        for attr in ("get", "post", "put", "delete", "request"):
            if hasattr(httpx, attr):
                setattr(httpx, attr, _blocked)
                applied += 1

        for clsname in ("Client", "AsyncClient"):
            cls = getattr(httpx, clsname, None)
            if cls and hasattr(cls, "request"):
                cls.request = _blocked  # type: ignore
                applied += 1
    except Exception:
        pass
    return applied


def _patch_urllib() -> int:
    applied = 0
    try:
        import urllib.request  # type: ignore

        def _blocked(*args, **kwargs):
            raise NetworkAttempt(f"NETWORK_BLOCKED: urllib.request call attempted args={args} kwargs={kwargs}")

        if hasattr(urllib.request, "urlopen"):
            urllib.request.urlopen = _blocked  # type: ignore
            applied += 1
    except Exception:
        pass
    return applied


def patch_network() -> int:
    return _patch_requests() + _patch_httpx() + _patch_urllib()


def _reload_module(modname: str) -> ModuleType:
    if modname in sys.modules:
        return importlib.reload(sys.modules[modname])
    return importlib.import_module(modname)


def _make_min_state_and_decision(action: str) -> Tuple[Dict[str, Any], Dict[str, Any]]:
    state: Dict[str, Any] = {
        "schema_version": 2,
        "pair": "SOL/USDC",
        "time": None,
        "last": 100.0,
        "prev_last": 99.0,
        "execution": {
            "enabled": (os.getenv("EXECUTION_ENABLED", "0") == "1"),
            "log_only": (os.getenv("EXECUTION_LOG_ONLY", "1") == "1"),
        },
        "freqtrade": {

Csak változott diff sorok

Teljes diff

[INFO] Nincs tartalmi eltérés a baseline és az aktuális fájl között.