Ezen az oldalon egy konkrét fájl aktuális állapotát tudod megnézni.
/opt/bots/saturnus/app/tools/executor_dryrun_validation.py#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
executor_dryrun_validation.py (CANONICAL TEST TOOL)
Purpose:
Validate that Saturnus tick_runner executor wiring is SAFE in dry-run:
A) EXECUTION_ENABLED=0 (disabled) => must NOT touch network, executed=False
B) EXECUTION_ENABLED=1 + EXECUTION_LOG_ONLY=1 (log-only) => must NOT touch network
(If any network attempt happens => FAIL)
This tool monkeypatches common HTTP clients (requests/httpx/urllib) to raise immediately.
If the executor path tries to call Freqtrade REST endpoints, the test will FAIL.
"""
import os
import sys
import importlib
from types import ModuleType
from typing import Any, Dict, Tuple
# --- Ensure /opt/bots/saturnus/app is importable (so "import tick_runner" works) ---
APP_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if APP_DIR not in sys.path:
sys.path.insert(0, APP_DIR)
class NetworkAttempt(Exception):
pass
def _patch_requests() -> int:
applied = 0
try:
import requests # type: ignore
def _blocked(*args, **kwargs):
raise NetworkAttempt(f"NETWORK_BLOCKED: requests call attempted args={args} kwargs={kwargs}")
if hasattr(requests, "request"):
requests.request = _blocked # type: ignore
applied += 1
try:
import requests.sessions # type: ignore
if hasattr(requests.sessions, "Session") and hasattr(requests.sessions.Session, "request"):
requests.sessions.Session.request = _blocked # type: ignore
applied += 1
except Exception:
pass
except Exception:
pass
return applied
def _patch_httpx() -> int:
applied = 0
try:
import httpx # type: ignore
def _blocked(*args, **kwargs):
raise NetworkAttempt(f"NETWORK_BLOCKED: httpx call attempted args={args} kwargs={kwargs}")
for attr in ("get", "post", "put", "delete", "request"):
if hasattr(httpx, attr):
setattr(httpx, attr, _blocked)
applied += 1
for clsname in ("Client", "AsyncClient"):
cls = getattr(httpx, clsname, None)
if cls and hasattr(cls, "request"):
cls.request = _blocked # type: ignore
applied += 1
except Exception:
pass
return applied
def _patch_urllib() -> int:
applied = 0
try:
import urllib.request # type: ignore
def _blocked(*args, **kwargs):
raise NetworkAttempt(f"NETWORK_BLOCKED: urllib.request call attempted args={args} kwargs={kwargs}")
if hasattr(urllib.request, "urlopen"):
urllib.request.urlopen = _blocked # type: ignore
applied += 1
except Exception:
pass
return applied
def patch_network() -> int:
return _patch_requests() + _patch_httpx() + _patch_urllib()
def _reload_module(modname: str) -> ModuleType:
if modname in sys.modules:
return importlib.reload(sys.modules[modname])
return importlib.import_module(modname)
def _make_min_state_and_decision(action: str) -> Tuple[Dict[str, Any], Dict[str, Any]]:
state: Dict[str, Any] = {
"schema_version": 2,
"pair": "SOL/USDC",
"time": None,
"last": 100.0,
"prev_last": 99.0,
"execution": {
"enabled": (os.getenv("EXECUTION_ENABLED", "0") == "1"),
"log_only": (os.getenv("EXECUTION_LOG_ONLY", "1") == "1"),
},
"freqtrade": {