Fájl részletek

Ezen az oldalon egy konkrét fájl aktuális állapotát tudod megnézni.

Vissza a fájltérképhez Csak változott Stratégia-labor Monitor főoldal
Fájl útvonala
/opt/bots/saturnus/app/rules_schema.py
Létezik most?
IGEN
Aktuális státusz
UNCHANGED
Méret
8434
Módosítás ideje
1774875063.2694387
Korábbi baseline időpont
1774875063.2694387
SHA256 rövid

Előnézet (első 120 sor)

import copy
from datetime import datetime, timezone


ALLOWED_SIDES = {"buy", "sell"}


class RulesValidationError(ValueError):
    pass


def utc_now_iso() -> str:
    return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")


def _is_number(value) -> bool:
    try:
        float(value)
        return True
    except Exception:
        return False


def _ensure_dict(value, name: str):
    if not isinstance(value, dict):
        raise RulesValidationError(f"{name} mezőnek objektumnak kell lennie.")
    return value


def _ensure_list(value, name: str):
    if not isinstance(value, list):
        raise RulesValidationError(f"{name} mezőnek listának kell lennie.")
    return value


def _validate_rule(rule: dict, section_name: str):
    _ensure_dict(rule, f"{section_name} szabály")

    required = ["id", "title", "side", "enabled", "priority", "params", "lab_only"]
    for key in required:
        if key not in rule:
            raise RulesValidationError(f"Hiányzó kötelező mező a(z) {section_name} szabálynál: {key}")

    rule_id = str(rule.get("id", "")).strip()
    if not rule_id:
        raise RulesValidationError(f"A(z) {section_name} egyik szabályának azonosítója üres.")

    side = str(rule.get("side", "")).strip().lower()
    if side not in ALLOWED_SIDES:
        raise RulesValidationError(f"A(z) {rule_id} szabály oldala csak 'buy' vagy 'sell' lehet.")

    if not isinstance(rule.get("enabled"), bool):
        raise RulesValidationError(f"A(z) {rule_id} szabály enabled mezője csak true/false lehet.")

    if not isinstance(rule.get("lab_only"), bool):
        raise RulesValidationError(f"A(z) {rule_id} szabály lab_only mezője csak true/false lehet.")

    if not isinstance(rule.get("priority"), int):
        raise RulesValidationError(f"A(z) {rule_id} szabály priority mezője csak egész szám lehet.")

    params = rule.get("params")
    _ensure_dict(params, f"{rule_id}.params")

    for key, value in params.items():
        if not _is_number(value):
            raise RulesValidationError(f"A(z) {rule_id} szabály {key} paramétere nem szám: {value}")

    for key in ["title", "goal", "description", "level_key", "level_formula_human", "system_comment", "user_comment"]:
        if key in rule and rule.get(key) is not None and not isinstance(rule.get(key), str):
            raise RulesValidationError(f"A(z) {rule_id} szabály {key} mezője csak szöveg lehet.")

    for key in ["conditions_human"]:
        if key in rule and rule.get(key) is not None:
            _ensure_list(rule.get(key), f"{rule_id}.{key}")


def _validate_rule_list(rules: list, expected_side: str, section_name: str):
    _ensure_list(rules, section_name)

    seen_ids = set()
    seen_priorities = set()

    for rule in rules:
        _validate_rule(rule, section_name)

        rule_id = str(rule["id"]).strip()
        if rule_id in seen_ids:
            raise RulesValidationError(f"Duplikált szabályazonosító a(z) {section_name} listában: {rule_id}")
        seen_ids.add(rule_id)

        if str(rule["side"]).strip().lower() != expected_side:
            raise RulesValidationError(f"A(z) {rule_id} szabály side mezője hibás, elvárt: {expected_side}")

        prio = rule["priority"]
        if prio in seen_priorities:
            raise RulesValidationError(f"Duplikált priority a(z) {section_name} listában: {prio}")
        seen_priorities.add(prio)


def validate_rules_doc(doc: dict, *, allow_lab_extra_rules: bool = False) -> dict:
    _ensure_dict(doc, "rules dokumentum")

    if int(doc.get("schema_version", 0)) != 1:
        raise RulesValidationError("A rules dokumentum schema_version mezője csak 1 lehet.")

    meta = _ensure_dict(doc.get("meta", {}), "meta")
    _ensure_dict(doc.get("definitions", {}), "definitions")
    globals_obj = _ensure_dict(doc.get("globals", {}), "globals")

    sell_hierarchy = _ensure_list(doc.get("sell_hierarchy", []), "sell_hierarchy")
    buy_hierarchy = _ensure_list(doc.get("buy_hierarchy", []), "buy_hierarchy")

    sell_rules = doc.get("sell_rules", [])
    buy_rules = doc.get("buy_rules", [])

    _validate_rule_list(sell_rules, "sell", "sell_rules")
    _validate_rule_list(buy_rules, "buy", "buy_rules")

    allow_disable = bool(globals_obj.get("allow_rule_disable", True))

Csak változott diff sorok

Teljes diff

[INFO] Nincs tartalmi eltérés a baseline és az aktuális fájl között.