Ezen az oldalon egy konkrét fájl aktuális állapotát tudod megnézni.
/opt/bots/saturnus/app/ui_app.pyfrom __future__ import annotations
import os
import json
import subprocess
from zoneinfo import ZoneInfo
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
from flask import Flask, jsonify, redirect, render_template
from health_bp import health_bp
from state_schema import ensure_state, validate_state
from ft_jwt_client import FreqtradeJWTClient
STATE_PATH = Path("/opt/bots/saturnus/state.json")
TZ_DEFAULT = "Europe/Budapest"
SERVICE_MAP = {
"runner": "saturnus-runner.service",
"freqtrade": "saturnus-freqtrade.service",
}
def now_pair(tz_name: str = TZ_DEFAULT) -> tuple[datetime, datetime, str]:
tz = ZoneInfo(tz_name)
dt_utc = datetime.now(timezone.utc)
dt_local = dt_utc.astimezone(tz)
return dt_local, dt_utc, tz_name
def _file_mtime_iso(path: Path) -> Optional[str]:
try:
ts = path.stat().st_mtime
return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
except Exception:
return None
def pick(*vals: Any) -> Optional[Any]:
for v in vals:
if v is None:
continue
if isinstance(v, str) and v.strip() == "":
continue
return v
return None
def load_state() -> tuple[dict, bool, list[str]]:
raw = None
if STATE_PATH.exists():
raw = json.loads(STATE_PATH.read_text(encoding="utf-8"))
state = ensure_state(raw)
ok, errors = validate_state(state)
return state, ok, errors
def build_ui(exchange: str) -> dict:
dt_local, dt_utc, tz_name = now_pair()
return {
"alive": True,
"exchange": exchange,
"ok": True,
"time_local": dt_local.isoformat(),
"time_utc": dt_utc.isoformat(),
"tz": tz_name,
}
def ft_client() -> Optional[FreqtradeJWTClient]:
base_url = os.getenv("FT_URL", "http://127.0.0.1:8089").strip()
user = os.getenv("FT_USERNAME", "saturnus").strip()
pwd = os.getenv("FT_PASSWORD", "").strip()
try:
return FreqtradeJWTClient(
base_url=base_url,
username=user,
password=pwd,
timeout=8,
)
except Exception:
return None
def ft_snapshot() -> dict:
c = ft_client()
if not c:
return {"ok": False, "error": "ft_client_init_failed"}
snap = {
"ok": True,
"base_url": getattr(c, "base_url", None),
"user": getattr(c, "username", None),
"ping": None,
"count": None,
"status": None,
"trader_running": None,
"error": None,
}
try:
sc, body = c.get("/api/v1/ping")
snap["ping"] = {"status_code": sc, "body": body}
except Exception as e:
snap["ok"] = False
snap["error"] = f"ping_failed: {e!s}"
return snap
try:
sc, body = c.get("/api/v1/count")
snap["count"] = {"status_code": sc, "body": body}
snap["trader_running"] = bool(sc == 200)
except Exception as e:
snap["trader_running"] = False
snap["count"] = {"status_code": 0, "body": {"error": str(e)}}