P0 修复: - runtimeStore: 添加缺失的 lastDayHistory 字段 - Gateway/RuntimeService: 状态同步改为内存优先,消除 glob 竞态 - App.jsx: 从 3075 行重构到 ~500 行,提取 8 个独立文件 P1 修复: - CORS: 4 个服务改为从环境变量读取允许 origins - MarketStore: 改为模块级单例模式 - Domain 层: 删除 trading thin wrapper,保留 news 真实逻辑 - 测试: 补齐 77 个 gateway/runtime 测试 新增文件: - backend/tests/test_gateway.py (43 tests) - frontend/src/hooks/useWebSocketHandler.js - frontend/src/hooks/useStockRequestCallbacks.js - frontend/src/hooks/useAgentCallbacks.js - frontend/src/hooks/useRuntimeCallbacks.js - frontend/src/hooks/useWatchlistCallbacks.js - frontend/src/components/TickerBar.jsx - frontend/src/components/HeaderRight.jsx - frontend/src/components/ChartTabs.jsx Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
605 lines
21 KiB
Python
605 lines
21 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Tests for the extracted runtime service app surface."""
|
|
|
|
import asyncio
|
|
import json
|
|
from unittest.mock import MagicMock
|
|
|
|
import pytest
|
|
from fastapi import HTTPException
|
|
from fastapi.testclient import TestClient
|
|
|
|
from backend.api import runtime as runtime_module
|
|
from backend.apps.runtime_service import create_app
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_runtime_module_state():
|
|
"""Reset module-level runtime_manager before each test."""
|
|
runtime_module.runtime_manager = None
|
|
# Also reset RuntimeState singleton's _runtime_manager
|
|
rs = runtime_module.get_runtime_state()
|
|
rs._runtime_manager = None
|
|
yield
|
|
runtime_module.runtime_manager = None
|
|
rs = runtime_module.get_runtime_state()
|
|
rs._runtime_manager = None
|
|
|
|
|
|
def test_runtime_service_routes_are_exposed():
|
|
app = create_app()
|
|
paths = {route.path for route in app.routes}
|
|
|
|
assert "/health" in paths
|
|
assert "/api/status" in paths
|
|
assert "/api/runtime/start" in paths
|
|
assert "/api/runtime/stop" in paths
|
|
assert "/api/runtime/current" in paths
|
|
assert "/api/runtime/gateway/port" in paths
|
|
|
|
|
|
def test_runtime_service_health_and_status(monkeypatch):
|
|
runtime_state = runtime_module.get_runtime_state()
|
|
runtime_state.gateway_process = None
|
|
runtime_state.gateway_port = 9876
|
|
runtime_state.runtime_manager = object()
|
|
|
|
with TestClient(create_app()) as client:
|
|
health_response = client.get("/health")
|
|
status_response = client.get("/api/status")
|
|
|
|
assert health_response.status_code == 200
|
|
assert health_response.json() == {
|
|
"status": "healthy",
|
|
"service": "runtime-service",
|
|
"gateway_running": False,
|
|
"gateway_port": 9876,
|
|
}
|
|
assert status_response.status_code == 200
|
|
assert status_response.json() == {
|
|
"status": "operational",
|
|
"service": "runtime-service",
|
|
"runtime": {
|
|
"gateway_running": False,
|
|
"gateway_port": 9876,
|
|
"has_runtime_manager": True,
|
|
},
|
|
}
|
|
|
|
|
|
def test_runtime_service_gateway_port_endpoint_uses_runtime_router(monkeypatch):
|
|
runtime_module.get_runtime_state().gateway_port = 9345
|
|
monkeypatch.setattr(runtime_module, "_is_gateway_running", lambda: True)
|
|
|
|
with TestClient(create_app()) as client:
|
|
response = client.get(
|
|
"/api/runtime/gateway/port",
|
|
headers={"host": "runtime.example:8003", "x-forwarded-proto": "https"},
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
assert response.json() == {
|
|
"port": 9345,
|
|
"is_running": True,
|
|
"ws_url": "wss://runtime.example:9345",
|
|
}
|
|
|
|
|
|
def test_runtime_service_get_runtime_config(monkeypatch, tmp_path):
|
|
run_dir = tmp_path / "runs" / "demo"
|
|
state_dir = run_dir / "state"
|
|
state_dir.mkdir(parents=True)
|
|
(run_dir / "BOOTSTRAP.md").write_text(
|
|
"---\n"
|
|
"tickers:\n"
|
|
" - AAPL\n"
|
|
"schedule_mode: intraday\n"
|
|
"interval_minutes: 30\n"
|
|
"trigger_time: '10:00'\n"
|
|
"max_comm_cycles: 3\n"
|
|
"enable_memory: true\n"
|
|
"---\n",
|
|
encoding="utf-8",
|
|
)
|
|
(state_dir / "runtime_state.json").write_text(
|
|
json.dumps(
|
|
{
|
|
"context": {
|
|
"config_name": "demo",
|
|
"run_dir": str(run_dir),
|
|
"bootstrap_values": {
|
|
"tickers": ["AAPL"],
|
|
"schedule_mode": "intraday",
|
|
"interval_minutes": 30,
|
|
"trigger_time": "10:00",
|
|
"max_comm_cycles": 3,
|
|
"enable_memory": True,
|
|
},
|
|
}
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
monkeypatch.setattr(runtime_module, "PROJECT_ROOT", tmp_path)
|
|
monkeypatch.setattr(runtime_module, "_is_gateway_running", lambda: True)
|
|
runtime_module.get_runtime_state().gateway_port = 8765
|
|
|
|
with TestClient(create_app()) as client:
|
|
response = client.get("/api/runtime/config")
|
|
|
|
assert response.status_code == 200
|
|
payload = response.json()
|
|
assert payload["run_id"] == "demo"
|
|
assert payload["bootstrap"]["schedule_mode"] == "intraday"
|
|
assert payload["resolved"]["interval_minutes"] == 30
|
|
assert payload["resolved"]["enable_memory"] is True
|
|
|
|
|
|
def test_runtime_service_update_runtime_config_persists_bootstrap(monkeypatch, tmp_path):
|
|
run_dir = tmp_path / "runs" / "demo"
|
|
state_dir = run_dir / "state"
|
|
state_dir.mkdir(parents=True)
|
|
(run_dir / "BOOTSTRAP.md").write_text(
|
|
"---\n"
|
|
"tickers:\n"
|
|
" - AAPL\n"
|
|
"schedule_mode: daily\n"
|
|
"interval_minutes: 60\n"
|
|
"trigger_time: '09:30'\n"
|
|
"max_comm_cycles: 2\n"
|
|
"---\n",
|
|
encoding="utf-8",
|
|
)
|
|
(state_dir / "runtime_state.json").write_text(
|
|
json.dumps(
|
|
{
|
|
"context": {
|
|
"config_name": "demo",
|
|
"run_dir": str(run_dir),
|
|
"bootstrap_values": {
|
|
"tickers": ["AAPL"],
|
|
"schedule_mode": "daily",
|
|
"interval_minutes": 60,
|
|
"trigger_time": "09:30",
|
|
"max_comm_cycles": 2,
|
|
},
|
|
}
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
class _DummyContext:
|
|
def __init__(self, run_dir):
|
|
self.config_name = "demo"
|
|
self.run_dir = run_dir
|
|
self.bootstrap_values = {
|
|
"tickers": ["AAPL"],
|
|
"schedule_mode": "daily",
|
|
"interval_minutes": 60,
|
|
"trigger_time": "09:30",
|
|
"max_comm_cycles": 2,
|
|
}
|
|
|
|
class _DummyManager:
|
|
def __init__(self):
|
|
self.config_name = "demo"
|
|
self.bootstrap = dict(_DummyContext(run_dir).bootstrap_values)
|
|
self.context = _DummyContext(run_dir)
|
|
|
|
def build_snapshot(self):
|
|
return {
|
|
"context": {
|
|
"config_name": self.context.config_name,
|
|
"run_dir": str(self.context.run_dir),
|
|
"bootstrap_values": self.context.bootstrap_values,
|
|
}
|
|
}
|
|
|
|
def _persist_snapshot(self):
|
|
return None
|
|
|
|
monkeypatch.setattr(runtime_module, "PROJECT_ROOT", tmp_path)
|
|
monkeypatch.setattr(runtime_module, "_is_gateway_running", lambda: True)
|
|
runtime_module.get_runtime_state().runtime_manager = _DummyManager()
|
|
runtime_module.get_runtime_state().gateway_port = 8765
|
|
|
|
with TestClient(create_app()) as client:
|
|
response = client.put(
|
|
"/api/runtime/config",
|
|
json={
|
|
"schedule_mode": "intraday",
|
|
"interval_minutes": 15,
|
|
"trigger_time": "10:15",
|
|
"max_comm_cycles": 4,
|
|
},
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
payload = response.json()
|
|
assert payload["bootstrap"]["schedule_mode"] == "intraday"
|
|
assert payload["resolved"]["interval_minutes"] == 15
|
|
assert "interval_minutes: 15" in (run_dir / "BOOTSTRAP.md").read_text(encoding="utf-8")
|
|
|
|
|
|
# =============================================================================
|
|
# RuntimeState singleton unit tests
|
|
# =============================================================================
|
|
|
|
def test_runtime_state_is_singleton():
|
|
"""RuntimeState.__new__ returns the same instance across calls."""
|
|
state1 = runtime_module.RuntimeState()
|
|
state2 = runtime_module.RuntimeState()
|
|
assert state1 is state2
|
|
|
|
|
|
def test_runtime_state_get_runtime_state_returns_same_instance():
|
|
"""get_runtime_state() returns the module singleton."""
|
|
instance = runtime_module.get_runtime_state()
|
|
assert instance is runtime_module._runtime_state
|
|
|
|
|
|
def test_runtime_state_default_values():
|
|
"""RuntimeState initializes with sensible defaults on first instantiation."""
|
|
# Reset singleton to get fresh __init__ values
|
|
runtime_module.RuntimeState._instance = None
|
|
runtime_module.RuntimeState._lock = asyncio.Lock()
|
|
state = runtime_module.RuntimeState()
|
|
assert state._runtime_manager is None
|
|
assert state._gateway_process is None
|
|
assert state._gateway_port == 8765
|
|
|
|
|
|
def test_runtime_state_gateway_port_property():
|
|
"""gateway_port property getter and setter work correctly."""
|
|
runtime_module.RuntimeState._instance = None
|
|
runtime_module.RuntimeState._lock = asyncio.Lock()
|
|
state = runtime_module.RuntimeState()
|
|
state.gateway_port = 9999
|
|
assert state.gateway_port == 9999
|
|
state.gateway_port = 1234
|
|
assert state.gateway_port == 1234
|
|
|
|
|
|
def test_runtime_state_gateway_process_property():
|
|
"""gateway_process property getter and setter work correctly."""
|
|
runtime_module.RuntimeState._instance = None
|
|
runtime_module.RuntimeState._lock = asyncio.Lock()
|
|
state = runtime_module.RuntimeState()
|
|
assert state.gateway_process is None
|
|
|
|
fake_process = object()
|
|
state.gateway_process = fake_process
|
|
assert state.gateway_process is fake_process
|
|
|
|
state.gateway_process = None
|
|
assert state.gateway_process is None
|
|
|
|
|
|
def test_runtime_state_runtime_manager_property():
|
|
"""runtime_manager property getter and setter work correctly."""
|
|
runtime_module.RuntimeState._instance = None
|
|
runtime_module.RuntimeState._lock = asyncio.Lock()
|
|
state = runtime_module.RuntimeState()
|
|
assert state.runtime_manager is None
|
|
|
|
fake_manager = object()
|
|
state.runtime_manager = fake_manager
|
|
assert state.runtime_manager is fake_manager
|
|
|
|
state.runtime_manager = None
|
|
assert state.runtime_manager is None
|
|
|
|
|
|
@pytest.mark.filterwarnings("ignore::RuntimeWarning")
|
|
def test_runtime_state_lock_property_is_async():
|
|
"""lock is an async property that returns a coroutine producing an asyncio.Lock."""
|
|
runtime_module.RuntimeState._instance = None
|
|
runtime_module.RuntimeState._lock = asyncio.Lock()
|
|
state = runtime_module.RuntimeState()
|
|
lock_coro = state.lock
|
|
assert asyncio.iscoroutine(lock_coro)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_runtime_state_async_set_get_gateway_port():
|
|
"""Async setters and getters for gateway_port with lock protection."""
|
|
runtime_module.RuntimeState._instance = None
|
|
runtime_module.RuntimeState._lock = asyncio.Lock()
|
|
state = runtime_module.RuntimeState()
|
|
await state.set_gateway_port(8888)
|
|
assert await state.get_gateway_port() == 8888
|
|
await state.set_gateway_port(7777)
|
|
assert await state.get_gateway_port() == 7777
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_runtime_state_async_set_get_gateway_process():
|
|
"""Async setters and getters for gateway_process with lock protection."""
|
|
runtime_module.RuntimeState._instance = None
|
|
runtime_module.RuntimeState._lock = asyncio.Lock()
|
|
state = runtime_module.RuntimeState()
|
|
await state.set_gateway_process(None)
|
|
assert await state.get_gateway_process() is None
|
|
|
|
fake_process = object()
|
|
await state.set_gateway_process(fake_process)
|
|
assert await state.get_gateway_process() is fake_process
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_runtime_state_async_set_get_runtime_manager():
|
|
"""Async setters and getters for runtime_manager with lock protection."""
|
|
runtime_module.RuntimeState._instance = None
|
|
runtime_module.RuntimeState._lock = asyncio.Lock()
|
|
state = runtime_module.RuntimeState()
|
|
await state.set_runtime_manager(None)
|
|
assert await state.get_runtime_manager() is None
|
|
|
|
fake_manager = object()
|
|
await state.set_runtime_manager(fake_manager)
|
|
assert await state.get_runtime_manager() is fake_manager
|
|
|
|
|
|
# =============================================================================
|
|
# _is_gateway_running helper tests
|
|
# =============================================================================
|
|
|
|
def test_is_gateway_running_returns_false_when_process_is_none():
|
|
"""_is_gateway_running returns False when gateway_process is None."""
|
|
runtime_module.RuntimeState._instance = None
|
|
runtime_module.RuntimeState._lock = asyncio.Lock()
|
|
new_state = runtime_module.RuntimeState()
|
|
new_state._gateway_process = None
|
|
runtime_module._runtime_state = new_state
|
|
assert runtime_module._is_gateway_running() is False
|
|
|
|
|
|
def test_is_gateway_running_returns_false_when_process_exited():
|
|
"""_is_gateway_running returns False when process has terminated."""
|
|
runtime_module.RuntimeState._instance = None
|
|
runtime_module.RuntimeState._lock = asyncio.Lock()
|
|
state = runtime_module.RuntimeState()
|
|
runtime_module._runtime_state = state
|
|
|
|
mock_process = MagicMock()
|
|
mock_process.poll.return_value = 1 # non-None = process has exited
|
|
|
|
state._gateway_process = mock_process
|
|
assert runtime_module._is_gateway_running() is False
|
|
|
|
|
|
def test_is_gateway_running_returns_true_when_process_running():
|
|
"""_is_gateway_running returns True when process is alive."""
|
|
runtime_module.RuntimeState._instance = None
|
|
runtime_module.RuntimeState._lock = asyncio.Lock()
|
|
state = runtime_module.RuntimeState()
|
|
runtime_module._runtime_state = state
|
|
|
|
mock_process = MagicMock()
|
|
mock_process.poll.return_value = None # None = still running
|
|
|
|
state._gateway_process = mock_process
|
|
assert runtime_module._is_gateway_running() is True
|
|
|
|
|
|
# =============================================================================
|
|
# _stop_gateway helper tests
|
|
# =============================================================================
|
|
|
|
def test_stop_gateway_returns_false_when_no_process():
|
|
"""_stop_gateway returns False if no gateway process exists."""
|
|
runtime_module.RuntimeState._instance = None
|
|
runtime_module.RuntimeState._lock = asyncio.Lock()
|
|
new_state = runtime_module.RuntimeState()
|
|
new_state._gateway_process = None
|
|
runtime_module._runtime_state = new_state
|
|
|
|
result = runtime_module._stop_gateway()
|
|
assert result is False
|
|
|
|
|
|
def test_stop_gateway_sets_process_to_none_after_stop():
|
|
"""_stop_gateway sets _gateway_process to None after stopping."""
|
|
runtime_module.RuntimeState._instance = None
|
|
runtime_module.RuntimeState._lock = asyncio.Lock()
|
|
state = runtime_module.RuntimeState()
|
|
runtime_module._runtime_state = state
|
|
|
|
mock_process = MagicMock()
|
|
mock_process.poll.return_value = None
|
|
mock_process.wait.return_value = 0
|
|
|
|
state._gateway_process = mock_process
|
|
|
|
result = runtime_module._stop_gateway()
|
|
|
|
assert result is True
|
|
assert state._gateway_process is None
|
|
mock_process.terminate.assert_called_once()
|
|
mock_process.wait.assert_called_once()
|
|
|
|
|
|
def test_stop_gateway_kills_when_terminate_times_out():
|
|
"""_stop_gateway kills the process if terminate times out."""
|
|
import subprocess
|
|
runtime_module.RuntimeState._instance = None
|
|
runtime_module.RuntimeState._lock = asyncio.Lock()
|
|
state = runtime_module.RuntimeState()
|
|
runtime_module._runtime_state = state
|
|
|
|
mock_process = MagicMock()
|
|
mock_process.poll.return_value = None
|
|
mock_process.wait.side_effect = subprocess.TimeoutExpired("cmd", 5)
|
|
mock_process.kill.return_value = None
|
|
|
|
state._gateway_process = mock_process
|
|
|
|
result = runtime_module._stop_gateway()
|
|
|
|
assert result is True
|
|
assert state._gateway_process is None
|
|
mock_process.kill.assert_called_once()
|
|
|
|
|
|
# =============================================================================
|
|
# _build_gateway_ws_url helper tests
|
|
# =============================================================================
|
|
|
|
def test_build_gateway_ws_url_defaults_to_ws():
|
|
from fastapi import Request
|
|
|
|
mock_request = MagicMock(spec=Request)
|
|
mock_request.headers.get.side_effect = lambda k, d="": d
|
|
mock_request.url.scheme = "http"
|
|
mock_request.url.hostname = "localhost"
|
|
|
|
url = runtime_module._build_gateway_ws_url(mock_request, 8765)
|
|
assert url == "ws://localhost:8765"
|
|
|
|
|
|
def test_build_gateway_ws_url_uses_wss_for_https():
|
|
from fastapi import Request
|
|
|
|
mock_request = MagicMock(spec=Request)
|
|
mock_request.headers.get.side_effect = lambda k, d="": d
|
|
mock_request.url.scheme = "https"
|
|
mock_request.url.hostname = "example.com"
|
|
|
|
url = runtime_module._build_gateway_ws_url(mock_request, 8765)
|
|
assert url == "wss://example.com:8765"
|
|
|
|
|
|
def test_build_gateway_ws_url_respects_forwarded_proto():
|
|
from fastapi import Request
|
|
|
|
mock_request = MagicMock(spec=Request)
|
|
def header_get(key, default=""):
|
|
if key == "x-forwarded-proto":
|
|
return "https"
|
|
return default
|
|
mock_request.headers.get.side_effect = header_get
|
|
mock_request.url.scheme = "http"
|
|
mock_request.url.hostname = "internal.example"
|
|
|
|
url = runtime_module._build_gateway_ws_url(mock_request, 8765)
|
|
assert url == "wss://internal.example:8765"
|
|
|
|
|
|
def test_build_gateway_ws_url_respects_forwarded_host():
|
|
from fastapi import Request
|
|
|
|
mock_request = MagicMock(spec=Request)
|
|
mock_request.headers.get.side_effect = lambda k, d="": {
|
|
"x-forwarded-host": "external.example.com"
|
|
}.get(k, d)
|
|
mock_request.url.scheme = "http"
|
|
mock_request.url.hostname = "internal.example"
|
|
|
|
url = runtime_module._build_gateway_ws_url(mock_request, 8765)
|
|
assert url == "ws://external.example.com:8765"
|
|
|
|
|
|
# =============================================================================
|
|
# _normalize_runtime_config_updates tests
|
|
# =============================================================================
|
|
|
|
|
|
def test_normalize_runtime_config_updates_validates_schedule_mode():
|
|
req = runtime_module.UpdateRuntimeConfigRequest(schedule_mode="invalid")
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
runtime_module._normalize_runtime_config_updates(req)
|
|
assert "schedule_mode" in str(exc_info.value.detail).lower()
|
|
|
|
|
|
def test_normalize_runtime_config_updates_validates_schedule_mode_values():
|
|
for invalid in ["weekly", "monthly", "once"]:
|
|
req = runtime_module.UpdateRuntimeConfigRequest(schedule_mode=invalid)
|
|
with pytest.raises(HTTPException):
|
|
runtime_module._normalize_runtime_config_updates(req)
|
|
|
|
|
|
def test_normalize_runtime_config_updates_accepts_daily_and_intraday():
|
|
for valid in ["daily", "intraday", "DAILY", "IntraDay"]:
|
|
req = runtime_module.UpdateRuntimeConfigRequest(schedule_mode=valid)
|
|
result = runtime_module._normalize_runtime_config_updates(req)
|
|
assert "schedule_mode" in result
|
|
|
|
|
|
def test_normalize_runtime_config_updates_validates_trigger_time_format():
|
|
req = runtime_module.UpdateRuntimeConfigRequest(trigger_time="25:99")
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
runtime_module._normalize_runtime_config_updates(req)
|
|
assert "trigger_time" in str(exc_info.value.detail).lower()
|
|
|
|
|
|
def test_normalize_runtime_config_updates_accepts_now_trigger_time():
|
|
req = runtime_module.UpdateRuntimeConfigRequest(trigger_time="now")
|
|
result = runtime_module._normalize_runtime_config_updates(req)
|
|
assert result["trigger_time"] == "now"
|
|
|
|
|
|
def test_normalize_runtime_config_updates_defaults_empty_trigger_time():
|
|
req = runtime_module.UpdateRuntimeConfigRequest(trigger_time=" ")
|
|
result = runtime_module._normalize_runtime_config_updates(req)
|
|
assert result["trigger_time"] == "09:30"
|
|
|
|
|
|
def test_normalize_runtime_config_updates_rejects_no_updates():
|
|
req = runtime_module.UpdateRuntimeConfigRequest()
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
runtime_module._normalize_runtime_config_updates(req)
|
|
assert "no runtime config updates" in str(exc_info.value.detail).lower()
|
|
|
|
|
|
def test_normalize_runtime_config_updates_coerces_types():
|
|
req = runtime_module.UpdateRuntimeConfigRequest(
|
|
schedule_mode="intraday",
|
|
interval_minutes="30", # string from JSON
|
|
initial_cash="50000.0", # string from JSON
|
|
margin_requirement="0.25",
|
|
)
|
|
result = runtime_module._normalize_runtime_config_updates(req)
|
|
assert result["schedule_mode"] == "intraday"
|
|
assert result["interval_minutes"] == 30
|
|
assert result["initial_cash"] == 50000.0
|
|
assert result["margin_requirement"] == 0.25
|
|
|
|
|
|
# =============================================================================
|
|
# register_runtime_manager / unregister_runtime_manager tests
|
|
# =============================================================================
|
|
|
|
def test_register_runtime_manager_sets_module_and_singleton():
|
|
runtime_module._runtime_state._initialized = True # prevent re-init
|
|
fake_manager = object()
|
|
|
|
runtime_module.register_runtime_manager(fake_manager)
|
|
|
|
assert runtime_module.runtime_manager is fake_manager
|
|
assert runtime_module._runtime_state.runtime_manager is fake_manager
|
|
|
|
|
|
def test_unregister_runtime_manager_clears_module_and_singleton():
|
|
runtime_module._runtime_state._initialized = True # prevent re-init
|
|
runtime_module._runtime_state.runtime_manager = object()
|
|
runtime_module.runtime_manager = object()
|
|
|
|
runtime_module.unregister_runtime_manager()
|
|
|
|
assert runtime_module.runtime_manager is None
|
|
assert runtime_module._runtime_state.runtime_manager is None
|
|
|
|
|
|
# =============================================================================
|
|
# _generate_run_id tests
|
|
# =============================================================================
|
|
|
|
def test_generate_run_id_returns_timestamp_format():
|
|
run_id = runtime_module._generate_run_id()
|
|
# Format: YYYYMMDD_HHMMSS - length is 15
|
|
assert len(run_id) == 15
|
|
assert run_id[8] == "_" # separator between date and time
|
|
assert run_id[:8].isdigit() # YYYYMMDD
|
|
assert run_id[9:].isdigit() # HHMMSS
|