Fájl részletek

Ezen az oldalon egy konkrét fájl aktuális állapotát tudod megnézni.

Vissza a fájltérképhez Csak változott Stratégia-labor Monitor főoldal
Fájl útvonala
/opt/bots/saturnus/app/ui_app.py
Létezik most?
IGEN
Aktuális státusz
UNCHANGED
Méret
9586
Módosítás ideje
1774638409.6027713
Korábbi baseline időpont
1774638409.6027713
SHA256 rövid

Előnézet (első 120 sor)

from __future__ import annotations

import os
import json
import subprocess
from zoneinfo import ZoneInfo
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional

from flask import Flask, jsonify, redirect, render_template

from health_bp import health_bp
from state_schema import ensure_state, validate_state
from ft_jwt_client import FreqtradeJWTClient


STATE_PATH = Path("/opt/bots/saturnus/state.json")
TZ_DEFAULT = "Europe/Budapest"

SERVICE_MAP = {
    "runner": "saturnus-runner.service",
    "freqtrade": "saturnus-freqtrade.service",
}


def now_pair(tz_name: str = TZ_DEFAULT) -> tuple[datetime, datetime, str]:
    tz = ZoneInfo(tz_name)
    dt_utc = datetime.now(timezone.utc)
    dt_local = dt_utc.astimezone(tz)
    return dt_local, dt_utc, tz_name


def _file_mtime_iso(path: Path) -> Optional[str]:
    try:
        ts = path.stat().st_mtime
        return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
    except Exception:
        return None


def pick(*vals: Any) -> Optional[Any]:
    for v in vals:
        if v is None:
            continue
        if isinstance(v, str) and v.strip() == "":
            continue
        return v
    return None


def load_state() -> tuple[dict, bool, list[str]]:
    raw = None
    if STATE_PATH.exists():
        raw = json.loads(STATE_PATH.read_text(encoding="utf-8"))

    state = ensure_state(raw)
    ok, errors = validate_state(state)
    return state, ok, errors


def build_ui(exchange: str) -> dict:
    dt_local, dt_utc, tz_name = now_pair()
    return {
        "alive": True,
        "exchange": exchange,
        "ok": True,
        "time_local": dt_local.isoformat(),
        "time_utc": dt_utc.isoformat(),
        "tz": tz_name,
    }


def ft_client() -> Optional[FreqtradeJWTClient]:
    base_url = os.getenv("FT_URL", "http://127.0.0.1:8089").strip()
    user = os.getenv("FT_USERNAME", "saturnus").strip()
    pwd = os.getenv("FT_PASSWORD", "").strip()

    try:
        return FreqtradeJWTClient(
            base_url=base_url,
            username=user,
            password=pwd,
            timeout=8,
        )
    except Exception:
        return None


def ft_snapshot() -> dict:
    c = ft_client()
    if not c:
        return {"ok": False, "error": "ft_client_init_failed"}

    snap = {
        "ok": True,
        "base_url": getattr(c, "base_url", None),
        "user": getattr(c, "username", None),
        "ping": None,
        "count": None,
        "status": None,
        "trader_running": None,
        "error": None,
    }

    try:
        sc, body = c.get("/api/v1/ping")
        snap["ping"] = {"status_code": sc, "body": body}
    except Exception as e:
        snap["ok"] = False
        snap["error"] = f"ping_failed: {e!s}"
        return snap

    try:
        sc, body = c.get("/api/v1/count")
        snap["count"] = {"status_code": sc, "body": body}
        snap["trader_running"] = bool(sc == 200)
    except Exception as e:
        snap["trader_running"] = False
        snap["count"] = {"status_code": 0, "body": {"error": str(e)}}

Csak változott diff sorok

Teljes diff

[INFO] Nincs tartalmi eltérés a baseline és az aktuális fájl között.