Ezen az oldalon egy konkrét fájl aktuális állapotát tudod megnézni.
/opt/bots/saturnus/app/app.pyimport json
import os
import freqtrade_ui
import tempfile
import hashlib
import subprocess
from datetime import datetime, timezone
from flask import Flask, jsonify, render_template, Response, redirect, request
from rules_loader import (
load_rules_doc,
save_rules_doc,
load_lab_override_doc,
save_lab_override_doc,
list_lab_profiles,
)
from rules_merge import build_effective_rules
APP_DIR = os.path.dirname(os.path.abspath(__file__))
ROOT_DIR = os.path.abspath(os.path.join(APP_DIR, ".."))
STATE_PATH = os.path.join(ROOT_DIR, "state.json")
FREQTRADE_CONFIG_PATH = os.path.join(ROOT_DIR, "freqtrade", "user_data", "config.json")
SETTINGS_ENV_FILE = "/etc/systemd/system/saturnus-runner.service.d/30-canonical-env.conf"
POLL_INTERVAL_MS = 15000
app = Flask(__name__, template_folder=os.path.join(APP_DIR, "templates"))
def utc_now_iso() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def local_time_str() -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def read_json(path: str, default: dict) -> dict:
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return default
def atomic_write_json(path: str, obj) -> None:
dirn = os.path.dirname(path) or "."
base = os.path.basename(path)
fd, tmp_path = tempfile.mkstemp(prefix=base + ".", suffix=".tmp", dir=dirn)
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(obj, f, ensure_ascii=False, indent=2)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, path)
finally:
try:
os.unlink(tmp_path)
except FileNotFoundError:
pass
def resolve_pair(state: dict) -> str:
candidates = [
state.get("pair"),
(state.get("market") or {}).get("pair"),
(state.get("prices") or {}).get("pair"),
(state.get("meta") or {}).get("pair"),
]
ex = state.get("execution") or {}
last_call = ex.get("last_call") or {}
last_call_resp = last_call.get("response") or {}
candidates.append(last_call_resp.get("pair"))
raw = last_call_resp.get("raw")
if isinstance(raw, list) and raw and isinstance(raw[0], dict):
candidates.append(raw[0].get("pair"))
last_confirm = ex.get("last_confirm") or {}
last_confirm_resp = last_confirm.get("response") or {}
candidates.append(last_confirm_resp.get("pair"))
raw2 = last_confirm_resp.get("raw")
if isinstance(raw2, list) and raw2 and isinstance(raw2[0], dict):
candidates.append(raw2[0].get("pair"))
for v in candidates:
if isinstance(v, str) and v.strip():
return v.strip()
return "SOL/USDC"
def _ensure_dict(state: dict, key: str) -> bool:
if key not in state or not isinstance(state.get(key), dict):
state[key] = {}
return True
return False
def _sync_flat_from_nested(state: dict, flat_key: str, nested: dict, nested_key: str) -> bool:
if nested is None:
return False
if nested_key not in nested:
return False
val = nested.get(nested_key)
if val is None:
return False
if state.get(flat_key) != val:
state[flat_key] = val
return True
return False
def ensure_state_schema(state: dict) -> dict: