From 6413edf8c9c7e41b9055924c674a89360967cba3 Mon Sep 17 00:00:00 2001 From: cillin Date: Tue, 24 Mar 2026 15:00:35 +0800 Subject: [PATCH] Refine runtime data flow and UI layering --- README.md | 15 ++ backend/api/runtime.py | 153 ++++++++++++------ backend/core/state_sync.py | 11 +- backend/domains/news.py | 11 +- backend/services/gateway.py | 7 +- backend/services/gateway_cycle_support.py | 13 +- backend/services/gateway_runtime_support.py | 7 +- backend/services/gateway_stock_handlers.py | 1 + backend/services/storage.py | 113 ++++++++----- backend/tests/test_gateway_support_modules.py | 9 ++ backend/tests/test_runtime_service_app.py | 43 +++++ backend/utils/settlement.py | 4 +- frontend/src/components/AgentCard.jsx | 2 +- frontend/src/components/AppShell.jsx | 3 + frontend/src/components/RuntimeLogsModal.jsx | 60 ++++++- frontend/src/components/StockExplainView.jsx | 29 ++++ frontend/src/styles/GlobalStyles.jsx | 6 +- 17 files changed, 373 insertions(+), 114 deletions(-) diff --git a/README.md b/README.md index 80dd5b3..df8a281 100644 --- a/README.md +++ b/README.md @@ -110,6 +110,21 @@ evotraders frontend # Default connects to port 8765, you can modi Visit `http://localhost:5173/` to view the trading room, select a date and click Run/Replay to observe the decision-making process. +### Runtime Data Layout + +- Long-lived research data is stored in `data/market_research.db` +- Each task run writes run-scoped state under `runs//` +- `runs//team_dashboard/*.json` is an export/compatibility layer for dashboard views, not the authoritative runtime source of truth +- Runtime APIs prefer active runtime state, `server_state.json`, and `runtime.db` + +Optional retention control: + +```bash +RUNS_RETENTION_COUNT=20 +``` + +Only timestamped run folders like `YYYYMMDD_HHMMSS` are pruned automatically when starting a new runtime. Named runs such as `smoke_fullstack` or `test_*` are preserved. + --- ## System Architecture diff --git a/backend/api/runtime.py b/backend/api/runtime.py index ec153c2..df5c89e 100644 --- a/backend/api/runtime.py +++ b/backend/api/runtime.py @@ -8,6 +8,7 @@ import json import logging import os import signal +import shutil import subprocess import sys from datetime import datetime @@ -194,6 +195,12 @@ class StopResponse(BaseModel): message: str +class CleanupResponse(BaseModel): + status: str + kept: int + pruned_run_ids: List[str] + + class GatewayStatusResponse(BaseModel): is_running: bool port: int @@ -235,6 +242,38 @@ def _get_run_dir(run_id: str) -> Path: return PROJECT_ROOT / "runs" / run_id +def _is_timestamped_run_dir(path: Path) -> bool: + try: + datetime.strptime(path.name, "%Y%m%d_%H%M%S") + return True + except ValueError: + return False + + +def _prune_old_timestamped_runs(*, keep: int = 20, exclude_run_ids: Optional[set[str]] = None) -> list[str]: + """Prune old timestamped run directories, preserving the newest N and excluded ids.""" + exclude = exclude_run_ids or set() + runs_root = PROJECT_ROOT / "runs" + if not runs_root.exists(): + return [] + + candidates = sorted( + [ + path + for path in runs_root.iterdir() + if path.is_dir() and _is_timestamped_run_dir(path) and path.name not in exclude + ], + key=lambda path: path.name, + reverse=True, + ) + + pruned: list[str] = [] + for path in candidates[max(0, keep):]: + shutil.rmtree(path, ignore_errors=True) + pruned.append(path.name) + return pruned + + def _find_available_port(start_port: int = 8765, max_port: int = 9000) -> int: """Find an available port for Gateway.""" import socket @@ -316,15 +355,9 @@ def _start_gateway_process( @router.get("/context", response_model=RunContextResponse) async def get_run_context() -> RunContextResponse: - """Return the most recent run context.""" - snapshot_path = PROJECT_ROOT.glob("runs/*/state/runtime_state.json") - snapshots = sorted(snapshot_path, key=lambda p: p.stat().st_mtime, reverse=True) - - if not snapshots: - raise HTTPException(status_code=404, detail="No run context available") - - latest = json.loads(snapshots[0].read_text(encoding="utf-8")) - context = latest.get("context") + """Return active runtime context, or latest persisted context when stopped.""" + snapshot = _get_active_runtime_snapshot() if _is_gateway_running() else _load_latest_runtime_snapshot() + context = snapshot.get("context") if context is None: raise HTTPException(status_code=404, detail="Run context is not ready") @@ -337,15 +370,9 @@ async def get_run_context() -> RunContextResponse: @router.get("/agents", response_model=RuntimeAgentsResponse) async def get_runtime_agents() -> RuntimeAgentsResponse: - """Return agent states from the most recent run.""" - snapshot_path = PROJECT_ROOT.glob("runs/*/state/runtime_state.json") - snapshots = sorted(snapshot_path, key=lambda p: p.stat().st_mtime, reverse=True) - - if not snapshots: - raise HTTPException(status_code=404, detail="No runtime state available") - - latest = json.loads(snapshots[0].read_text(encoding="utf-8")) - agents = latest.get("agents", []) + """Return agent states from the active runtime, or latest persisted run.""" + snapshot = _get_active_runtime_snapshot() if _is_gateway_running() else _load_latest_runtime_snapshot() + agents = snapshot.get("agents", []) return RuntimeAgentsResponse( agents=[RuntimeAgentState(**a) for a in agents] @@ -354,15 +381,9 @@ async def get_runtime_agents() -> RuntimeAgentsResponse: @router.get("/events", response_model=RuntimeEventsResponse) async def get_runtime_events() -> RuntimeEventsResponse: - """Return events from the most recent run.""" - snapshot_path = PROJECT_ROOT.glob("runs/*/state/runtime_state.json") - snapshots = sorted(snapshot_path, key=lambda p: p.stat().st_mtime, reverse=True) - - if not snapshots: - raise HTTPException(status_code=404, detail="No runtime state available") - - latest = json.loads(snapshots[0].read_text(encoding="utf-8")) - events = latest.get("events", []) + """Return events from the active runtime, or latest persisted run.""" + snapshot = _get_active_runtime_snapshot() if _is_gateway_running() else _load_latest_runtime_snapshot() + events = snapshot.get("events", []) return RuntimeEventsResponse( events=[RuntimeEvent(**e) for e in events] @@ -376,15 +397,10 @@ async def get_gateway_status() -> GatewayStatusResponse: run_id = None if is_running: - # Try to find run_id from runtime state - snapshot_path = PROJECT_ROOT.glob("runs/*/state/runtime_state.json") - snapshots = sorted(snapshot_path, key=lambda p: p.stat().st_mtime, reverse=True) - if snapshots: - try: - latest = json.loads(snapshots[0].read_text(encoding="utf-8")) - run_id = latest.get("context", {}).get("config_name") - except Exception as e: - logger.warning(f"Failed to parse latest snapshot: {e}") + try: + run_id = _get_active_runtime_context().get("config_name") + except Exception as e: + logger.warning(f"Failed to resolve active runtime context: {e}") return GatewayStatusResponse( is_running=is_running, @@ -408,7 +424,7 @@ async def get_gateway_port(request: Request) -> Dict[str, Any]: async def get_runtime_logs() -> RuntimeLogResponse: """Return current runtime log tail, or the latest run log if runtime is stopped.""" try: - context = _get_runtime_context_from_latest_snapshot() + context = _get_active_runtime_context() if _is_gateway_running() else _get_runtime_context_from_latest_snapshot() except HTTPException: return RuntimeLogResponse(is_running=False, content="") @@ -450,6 +466,21 @@ def _load_latest_runtime_snapshot() -> Dict[str, Any]: return json.loads(snapshots[0].read_text(encoding="utf-8")) +def _get_active_runtime_snapshot() -> Dict[str, Any]: + """Return the active runtime snapshot, preferring in-memory manager state.""" + if not _is_gateway_running(): + raise HTTPException(status_code=404, detail="No runtime is currently running") + + manager = _runtime_state.runtime_manager + if manager is not None and hasattr(manager, "build_snapshot"): + snapshot = manager.build_snapshot() + context = snapshot.get("context") or {} + if context.get("config_name"): + return snapshot + + return _load_latest_runtime_snapshot() + + def _get_runtime_context_from_latest_snapshot() -> Dict[str, Any]: """Return the latest persisted runtime context regardless of active process state.""" latest = _load_latest_runtime_snapshot() @@ -476,7 +507,16 @@ def _get_current_runtime_context() -> Dict[str, Any]: """Return the active runtime context from the latest snapshot.""" if not _is_gateway_running(): raise HTTPException(status_code=404, detail="No runtime is currently running") - return _get_runtime_context_from_latest_snapshot() + snapshot = _get_active_runtime_snapshot() + context = snapshot.get("context") or {} + if not context.get("config_name"): + raise HTTPException(status_code=404, detail="No runtime context available") + return context + + +def _get_active_runtime_context() -> Dict[str, Any]: + """Return the active runtime context, preferring in-memory runtime manager state.""" + return _get_current_runtime_context() def _resolve_runtime_response(run_id: str) -> RuntimeConfigResponse: @@ -573,6 +613,14 @@ async def start_runtime( run_id = _generate_run_id() run_dir = _get_run_dir(run_id) + retention_keep = max(1, int(os.getenv("RUNS_RETENTION_COUNT", "20") or "20")) + pruned_run_ids = _prune_old_timestamped_runs( + keep=retention_keep, + exclude_run_ids={run_id}, + ) + if pruned_run_ids: + logger.info("Pruned old run directories: %s", ", ".join(pruned_run_ids)) + # 3. Prepare bootstrap config bootstrap = { "tickers": config.tickers, @@ -690,6 +738,25 @@ async def stop_runtime(force: bool = True) -> StopResponse: ) +@router.post("/cleanup", response_model=CleanupResponse) +async def cleanup_old_runs(keep: int = 20) -> CleanupResponse: + """Prune old timestamped run directories while preserving named runs.""" + keep_count = max(1, int(keep)) + exclude: set[str] = set() + + if _is_gateway_running(): + try: + active_context = _get_active_runtime_context() + active_run_id = str(active_context.get("config_name") or "").strip() + if active_run_id: + exclude.add(active_run_id) + except HTTPException: + pass + + pruned = _prune_old_timestamped_runs(keep=keep_count, exclude_run_ids=exclude) + return CleanupResponse(status="ok", kept=keep_count, pruned_run_ids=pruned) + + @router.post("/restart") async def restart_runtime( config: LaunchConfig, @@ -716,15 +783,7 @@ async def get_current_runtime(): if not _is_gateway_running(): raise HTTPException(status_code=404, detail="No runtime is currently running") - # Find latest runtime state - snapshot_path = PROJECT_ROOT.glob("runs/*/state/runtime_state.json") - snapshots = sorted(snapshot_path, key=lambda p: p.stat().st_mtime, reverse=True) - - if not snapshots: - raise HTTPException(status_code=404, detail="No runtime information available") - - latest = json.loads(snapshots[0].read_text(encoding="utf-8")) - context = latest.get("context", {}) + context = _get_active_runtime_context() return { "run_id": context.get("config_name"), diff --git a/backend/core/state_sync.py b/backend/core/state_sync.py index 43a1ad2..304e835 100644 --- a/backend/core/state_sync.py +++ b/backend/core/state_sync.py @@ -488,12 +488,13 @@ class StateSync: } if include_dashboard: + dashboard_snapshot = self.storage.build_dashboard_snapshot_from_state(self._state) payload["dashboard"] = { - "summary": self.storage.load_file("summary"), - "holdings": self.storage.load_file("holdings"), - "stats": self.storage.load_file("stats"), - "trades": self.storage.load_file("trades"), - "leaderboard": self.storage.load_file("leaderboard"), + "summary": dashboard_snapshot.get("summary"), + "holdings": dashboard_snapshot.get("holdings"), + "stats": dashboard_snapshot.get("stats"), + "trades": dashboard_snapshot.get("trades"), + "leaderboard": dashboard_snapshot.get("leaderboard"), } return payload diff --git a/backend/domains/news.py b/backend/domains/news.py index c3dc2ed..cf147a3 100644 --- a/backend/domains/news.py +++ b/backend/domains/news.py @@ -30,6 +30,7 @@ def ensure_news_fresh( *, ticker: str, target_date: str | None = None, + refresh_if_stale: bool = True, ) -> dict[str, Any]: """Refresh raw news incrementally when stored watermarks are stale.""" normalized_target = str(target_date or "").strip()[:10] @@ -44,7 +45,7 @@ def ensure_news_fresh( watermarks = store.get_ticker_watermarks(ticker) last_news_fetch = str(watermarks.get("last_news_fetch") or "").strip()[:10] refreshed = False - if not last_news_fetch or last_news_fetch < normalized_target: + if refresh_if_stale and (not last_news_fetch or last_news_fetch < normalized_target): update_ticker_incremental( ticker, end_date=normalized_target, @@ -238,8 +239,14 @@ def get_range_explain_payload( end_date: str, article_ids: list[str] | None = None, limit: int = 100, + refresh_if_stale: bool = False, ) -> dict[str, Any]: - freshness = ensure_news_fresh(store, ticker=ticker, target_date=end_date) + freshness = ensure_news_fresh( + store, + ticker=ticker, + target_date=end_date, + refresh_if_stale=refresh_if_stale, + ) news_rows = [] if article_ids: news_rows = store.get_news_by_ids_enriched(ticker, article_ids) diff --git a/backend/services/gateway.py b/backend/services/gateway.py index aa30822..7c7252d 100644 --- a/backend/services/gateway.py +++ b/backend/services/gateway.py @@ -152,10 +152,11 @@ class Gateway: ) # Load and display existing portfolio state if available - summary = self.storage.load_file("summary") + dashboard_snapshot = self.storage.build_dashboard_snapshot_from_state(self.state_sync.state) + summary = dashboard_snapshot.get("summary") if summary: - holdings = self.storage.load_file("holdings") or [] - trades = self.storage.load_file("trades") or [] + holdings = dashboard_snapshot.get("holdings") or [] + trades = dashboard_snapshot.get("trades") or [] current_date = self.state_sync.state.get("current_date") self._dashboard.update( date=current_date or "-", diff --git a/backend/services/gateway_cycle_support.py b/backend/services/gateway_cycle_support.py index d5a1319..363e07c 100644 --- a/backend/services/gateway_cycle_support.py +++ b/backend/services/gateway_cycle_support.py @@ -61,7 +61,7 @@ async def market_status_monitor(gateway: Any) -> None: status = gateway.market_service.get_market_status() if status["status"] == "open" and not gateway.storage.is_live_session_active: gateway.storage.start_live_session() - summary = gateway.storage.load_file("summary") or {} + summary = gateway.storage.build_dashboard_snapshot_from_state(gateway.state_sync.state).get("summary") or {} gateway._session_start_portfolio_value = summary.get( "totalAssetValue", gateway.storage.initial_cash, @@ -240,14 +240,15 @@ async def run_live_cycle(gateway: Any, date: str, tickers: list[str]) -> None: async def finalize_cycle(gateway: Any, date: str) -> None: - summary = gateway.storage.load_file("summary") or {} + dashboard_snapshot = gateway.storage.build_dashboard_snapshot_from_state(gateway.state_sync.state) + summary = dashboard_snapshot.get("summary") or {} if gateway.storage.is_live_session_active: summary.update(gateway.storage.get_live_returns()) await gateway.state_sync.on_cycle_end(date, portfolio_summary=summary) - holdings = gateway.storage.load_file("holdings") or [] - trades = gateway.storage.load_file("trades") or [] - leaderboard = gateway.storage.load_file("leaderboard") or [] + holdings = dashboard_snapshot.get("holdings") or [] + trades = dashboard_snapshot.get("trades") or [] + leaderboard = dashboard_snapshot.get("leaderboard") or [] if leaderboard: await gateway.state_sync.on_leaderboard_update(leaderboard) gateway._dashboard.update(date=date, status="Running", portfolio=summary, holdings=holdings, trades=trades) @@ -319,7 +320,7 @@ async def run_backtest_dates(gateway: Any, dates: list[str]) -> None: await gateway.on_strategy_trigger(date=date) await asyncio.sleep(0.1) await gateway.state_sync.on_system_message(f"Backtest complete - {len(dates)} days") - summary = gateway.storage.load_file("summary") or {} + summary = gateway.storage.build_dashboard_snapshot_from_state(gateway.state_sync.state).get("summary") or {} gateway._dashboard.update(status="Complete", portfolio=summary, days_completed=len(dates)) gateway._dashboard.stop() gateway._dashboard.print_final_summary() diff --git a/backend/services/gateway_runtime_support.py b/backend/services/gateway_runtime_support.py index acfdff7..1e9bfb7 100644 --- a/backend/services/gateway_runtime_support.py +++ b/backend/services/gateway_runtime_support.py @@ -164,9 +164,10 @@ def sync_runtime_state(gateway: Any) -> None: gateway._dashboard.initial_cash = gateway.storage.initial_cash gateway._dashboard.enable_memory = bool(gateway.config.get("enable_memory", False)) - summary = gateway.storage.load_file("summary") or {} - holdings = gateway.storage.load_file("holdings") or [] - trades = gateway.storage.load_file("trades") or [] + dashboard_snapshot = gateway.storage.build_dashboard_snapshot_from_state(gateway.state_sync.state) + summary = dashboard_snapshot.get("summary") or {} + holdings = dashboard_snapshot.get("holdings") or [] + trades = dashboard_snapshot.get("trades") or [] gateway._dashboard.update( portfolio=summary, holdings=holdings, diff --git a/backend/services/gateway_stock_handlers.py b/backend/services/gateway_stock_handlers.py index 5f25b17..7b1a037 100644 --- a/backend/services/gateway_stock_handlers.py +++ b/backend/services/gateway_stock_handlers.py @@ -361,6 +361,7 @@ async def handle_get_stock_range_explain(gateway: Any, websocket: Any, data: dic end_date=end_date, article_ids=article_ids if isinstance(article_ids, list) else None, limit=100, + refresh_if_stale=False, ) result = payload.get("result") diff --git a/backend/services/storage.py b/backend/services/storage.py index 79d5d23..960ee5b 100644 --- a/backend/services/storage.py +++ b/backend/services/storage.py @@ -11,7 +11,6 @@ from pathlib import Path from typing import Any, Dict, List, Optional from backend.data.market_store import MarketStore -from .research_db import ResearchDb from .runtime_db import RuntimeDb logger = logging.getLogger(__name__) @@ -22,12 +21,18 @@ class StorageService: Storage service for data persistence Responsibilities: - 1. Load/save dashboard JSON files + 1. Export dashboard JSON files (summary, holdings, stats, trades, leaderboard) 2. Load/save internal state (_internal_state.json) 3. Load/save server state (server_state.json) with feed history 4. Manage portfolio state persistence 5. Support loading from saved state to resume execution + + Notes: + - team_dashboard/*.json is treated as an export/compatibility layer + rather than the authoritative runtime source of truth. + - authoritative runtime reads should prefer in-memory state, server_state, + runtime.db, and market_research.db. """ def __init__( @@ -49,7 +54,7 @@ class StorageService: self.initial_cash = initial_cash self.config_name = config_name - # Dashboard file paths + # Dashboard export file paths self.files = { "summary": self.dashboard_dir / "summary.json", "holdings": self.dashboard_dir / "holdings.json", @@ -66,7 +71,6 @@ class StorageService: self.state_dir.mkdir(parents=True, exist_ok=True) self.server_state_file = self.state_dir / "server_state.json" self.runtime_db = RuntimeDb(self.state_dir / "runtime.db") - self.research_db = ResearchDb(self.state_dir / "research.db") self.market_store = MarketStore() # Feed history (for agent messages) @@ -84,16 +88,8 @@ class StorageService: logger.info(f"Storage service initialized: {self.dashboard_dir}") - def load_file(self, file_type: str) -> Optional[Any]: - """ - Load dashboard JSON file - - Args: - file_type: One of: summary, holdings, stats, trades, leaderboard - - Returns: - Loaded data or None if file doesn't exist - """ + def load_export_file(self, file_type: str) -> Optional[Any]: + """Load dashboard export JSON file.""" file_path = self.files.get(file_type) if not file_path or not file_path.exists(): return None @@ -105,14 +101,12 @@ class StorageService: logger.error(f"Failed to load {file_type}.json: {e}") return None - def save_file(self, file_type: str, data: Any): - """ - Save dashboard JSON file + def load_file(self, file_type: str) -> Optional[Any]: + """Backward-compatible alias for export-layer JSON reads.""" + return self.load_export_file(file_type) - Args: - file_type: One of: summary, holdings, stats, trades, leaderboard - data: Data to save - """ + def save_export_file(self, file_type: str, data: Any): + """Save dashboard export JSON file.""" file_path = self.files.get(file_type) if not file_path: logger.error(f"Unknown file type: {file_type}") @@ -129,6 +123,48 @@ class StorageService: except Exception as e: logger.error(f"Failed to save {file_type}.json: {e}") + def save_file(self, file_type: str, data: Any): + """Backward-compatible alias for export-layer JSON writes.""" + self.save_export_file(file_type, data) + + def build_dashboard_snapshot_from_state( + self, + state: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + """Build dashboard view data from runtime state instead of JSON exports.""" + runtime_state = state or self.load_server_state() + portfolio = dict(runtime_state.get("portfolio") or {}) + holdings = list(runtime_state.get("holdings") or []) + stats = runtime_state.get("stats") or self._get_default_stats() + trades = list(runtime_state.get("trades") or []) + leaderboard = list(runtime_state.get("leaderboard") or []) + + summary = { + "totalAssetValue": portfolio.get("total_value", self.initial_cash), + "totalReturn": portfolio.get("pnl_percent", 0.0), + "cashPosition": portfolio.get("cash", self.initial_cash), + "tickerWeights": stats.get("tickerWeights", {}), + "totalTrades": len(trades), + "pnlPct": portfolio.get("pnl_percent", 0.0), + "balance": portfolio.get("total_value", self.initial_cash), + "equity": portfolio.get("equity", []), + "baseline": portfolio.get("baseline", []), + "baseline_vw": portfolio.get("baseline_vw", []), + "momentum": portfolio.get("momentum", []), + "equity_return": portfolio.get("equity_return", []), + "baseline_return": portfolio.get("baseline_return", []), + "baseline_vw_return": portfolio.get("baseline_vw_return", []), + "momentum_return": portfolio.get("momentum_return", []), + } + + return { + "summary": summary, + "holdings": holdings, + "stats": stats, + "trades": trades, + "leaderboard": leaderboard, + } + def check_file_updates(self) -> Dict[str, bool]: """ Check which dashboard files have been updated since last check @@ -297,7 +333,7 @@ class StorageService: def initialize_empty_dashboard(self): """Initialize empty dashboard files with default values""" # Summary - self.save_file( + self.save_export_file( "summary", { "totalAssetValue": self.initial_cash, @@ -315,10 +351,10 @@ class StorageService: ) # Holdings - self.save_file("holdings", []) + self.save_export_file("holdings", []) # Stats - self.save_file( + self.save_export_file( "stats", { "totalAssetValue": self.initial_cash, @@ -335,7 +371,7 @@ class StorageService: ) # Trades - self.save_file("trades", []) + self.save_export_file("trades", []) # Leaderboard with model info self.generate_leaderboard() @@ -375,7 +411,7 @@ class StorageService: ranking_entries.append(entry) leaderboard = team_entries + ranking_entries - self.save_file("leaderboard", leaderboard) + self.save_export_file("leaderboard", leaderboard) logger.info("Leaderboard generated with model info") def update_leaderboard_model_info(self): @@ -398,7 +434,7 @@ class StorageService: entry["modelName"] = model_name entry["modelProvider"] = model_provider - self.save_file("leaderboard", existing) + self.save_export_file("leaderboard", existing) logger.info("Leaderboard model info updated") def get_current_timestamp_ms(self, date: str = None) -> int: @@ -653,7 +689,7 @@ class StorageService: "momentum": state.get("momentum_history", []), } - self.save_file("summary", summary) + self.save_export_file("summary", summary) def _generate_holdings( self, @@ -715,7 +751,7 @@ class StorageService: # Sort by weight holdings.sort(key=lambda x: abs(x["weight"]), reverse=True) - self.save_file("holdings", holdings) + self.save_export_file("holdings", holdings) def _generate_stats(self, state: Dict[str, Any], net_value: float): """Generate stats.json""" @@ -738,7 +774,7 @@ class StorageService: }, } - self.save_file("stats", stats) + self.save_export_file("stats", stats) def _generate_trades(self, state: Dict[str, Any]): """Generate trades.json""" @@ -764,7 +800,7 @@ class StorageService: }, ) - self.save_file("trades", trades) + self.save_export_file("trades", trades) # Server State Management Methods @@ -1001,12 +1037,12 @@ class StorageService: Args: state: Server state dictionary to update """ - # Load dashboard data - summary = self.load_file("summary") or {} - holdings = self.load_file("holdings") or [] - stats = self.load_file("stats") or self._get_default_stats() - trades = self.load_file("trades") or [] - leaderboard = self.load_file("leaderboard") or [] + dashboard_snapshot = self.build_dashboard_snapshot_from_state(state) + summary = dashboard_snapshot.get("summary") or {} + holdings = dashboard_snapshot.get("holdings") or [] + stats = dashboard_snapshot.get("stats") or self._get_default_stats() + trades = dashboard_snapshot.get("trades") or [] + leaderboard = dashboard_snapshot.get("leaderboard") or [] internal_state = self.load_internal_state() # Update state @@ -1040,7 +1076,6 @@ class StorageService: Start tracking live returns for current trading session. Captures current values as session start baseline. """ - summary = self.load_file("summary") or {} state = self.load_internal_state() # Capture current values as session start @@ -1052,7 +1087,7 @@ class StorageService: self._session_start_equity = ( equity_history[-1]["v"] if equity_history - else summary.get("totalAssetValue", self.initial_cash) + else self.initial_cash ) self._session_start_baseline = ( baseline_history[-1]["v"] diff --git a/backend/tests/test_gateway_support_modules.py b/backend/tests/test_gateway_support_modules.py index 1812828..d5681a1 100644 --- a/backend/tests/test_gateway_support_modules.py +++ b/backend/tests/test_gateway_support_modules.py @@ -77,6 +77,15 @@ class _DummyStorage: return {"totalAssetValue": self.initial_cash} return [] + def build_dashboard_snapshot_from_state(self, state): + return { + "summary": {"totalAssetValue": self.initial_cash}, + "holdings": [], + "stats": {}, + "trades": [], + "leaderboard": [], + } + class _DummyPM: def __init__(self): diff --git a/backend/tests/test_runtime_service_app.py b/backend/tests/test_runtime_service_app.py index 2c406fa..766234d 100644 --- a/backend/tests/test_runtime_service_app.py +++ b/backend/tests/test_runtime_service_app.py @@ -17,6 +17,7 @@ def test_runtime_service_routes_are_exposed(): assert "/api/status" in paths assert "/api/runtime/start" in paths assert "/api/runtime/stop" in paths + assert "/api/runtime/cleanup" in paths assert "/api/runtime/current" in paths assert "/api/runtime/gateway/port" in paths @@ -192,3 +193,45 @@ def test_runtime_service_update_runtime_config_persists_bootstrap(monkeypatch, t assert payload["bootstrap"]["schedule_mode"] == "intraday" assert payload["resolved"]["interval_minutes"] == 15 assert "interval_minutes: 15" in (run_dir / "BOOTSTRAP.md").read_text(encoding="utf-8") + + +def test_prune_old_timestamped_runs_keeps_named_runs(monkeypatch, tmp_path): + runs_dir = tmp_path / "runs" + runs_dir.mkdir() + + keep_dirs = ["20260324_110000", "20260324_120000"] + prune_dir = "20260324_100000" + named_dir = "smoke_fullstack" + + for name in [*keep_dirs, prune_dir, named_dir]: + (runs_dir / name).mkdir(parents=True) + + monkeypatch.setattr(runtime_module, "PROJECT_ROOT", tmp_path) + + pruned = runtime_module._prune_old_timestamped_runs(keep=1, exclude_run_ids={"20260324_120000"}) + + assert prune_dir in pruned + assert (runs_dir / named_dir).exists() + assert (runs_dir / "20260324_120000").exists() + assert (runs_dir / "20260324_110000").exists() + + +def test_runtime_cleanup_endpoint_prunes_old_runs(monkeypatch, tmp_path): + runs_dir = tmp_path / "runs" + runs_dir.mkdir() + + for name in ["20260324_090000", "20260324_100000", "20260324_110000", "smoke_fullstack"]: + (runs_dir / name).mkdir(parents=True) + + monkeypatch.setattr(runtime_module, "PROJECT_ROOT", tmp_path) + monkeypatch.setattr(runtime_module, "_is_gateway_running", lambda: False) + + with TestClient(create_app()) as client: + response = client.post("/api/runtime/cleanup?keep=1") + + assert response.status_code == 200 + payload = response.json() + assert payload["status"] == "ok" + assert sorted(payload["pruned_run_ids"]) == ["20260324_090000", "20260324_100000"] + assert (runs_dir / "20260324_110000").exists() + assert (runs_dir / "smoke_fullstack").exists() diff --git a/backend/utils/settlement.py b/backend/utils/settlement.py index d4f714e..6ae52f3 100644 --- a/backend/utils/settlement.py +++ b/backend/utils/settlement.py @@ -228,12 +228,12 @@ class SettlementCoordinator: all_evaluations = {**analyst_evaluations, **pm_evaluations} - leaderboard = self.storage.load_file("leaderboard") or [] + leaderboard = self.storage.load_export_file("leaderboard") or [] updated_leaderboard = update_leaderboard_with_evaluations( leaderboard, all_evaluations, ) - self.storage.save_file("leaderboard", updated_leaderboard) + self.storage.save_export_file("leaderboard", updated_leaderboard) self._update_summary_with_baselines( date, diff --git a/frontend/src/components/AgentCard.jsx b/frontend/src/components/AgentCard.jsx index bdcb041..c10e07d 100644 --- a/frontend/src/components/AgentCard.jsx +++ b/frontend/src/components/AgentCard.jsx @@ -57,7 +57,7 @@ export default function AgentCard({ agent, onClose, isClosing }) { background: '#ffffff', borderBottom: '2px solid #000000', boxShadow: '0 4px 12px rgba(0, 0, 0, 0.1)', - zIndex: 1000, + zIndex: 800, animation: isClosing ? 'slideUp 0.2s ease-out forwards' : 'slideDown 0.25s ease-out' }}> {/* Horizontal scrollable content */} diff --git a/frontend/src/components/AppShell.jsx b/frontend/src/components/AppShell.jsx index 1c4b280..d6eb41a 100644 --- a/frontend/src/components/AppShell.jsx +++ b/frontend/src/components/AppShell.jsx @@ -455,6 +455,9 @@ export default function AppShell({ newsSnapshot={newsByTicker[selectedExplainSymbol] || null} insiderTradesSnapshot={insiderTradesByTicker[selectedExplainSymbol] || null} technicalIndicatorsSnapshot={technicalIndicatorsByTicker[selectedExplainSymbol] || null} + onRequestHistory={stockRequests?.requestStockHistory} + onRequestExplainEvents={stockRequests?.requestStockExplainEvents} + onRequestNews={stockRequests?.requestStockNews} onRequestRangeExplain={stockRequests?.requestStockRangeExplain} onRequestNewsForDate={stockRequests?.requestStockNewsForDate} onRequestStory={stockRequests?.requestStockStory} diff --git a/frontend/src/components/RuntimeLogsModal.jsx b/frontend/src/components/RuntimeLogsModal.jsx index 77aa615..45fd128 100644 --- a/frontend/src/components/RuntimeLogsModal.jsx +++ b/frontend/src/components/RuntimeLogsModal.jsx @@ -1,4 +1,4 @@ -import React from 'react'; +import React, { useEffect, useMemo, useRef, useState } from 'react'; import { createPortal } from 'react-dom'; export default function RuntimeLogsModal({ @@ -9,6 +9,32 @@ export default function RuntimeLogsModal({ onClose, onRefresh }) { + const logRef = useRef(null); + const [autoRefresh, setAutoRefresh] = useState(true); + const [followTail, setFollowTail] = useState(true); + + const refreshIntervalMs = useMemo(() => 2000, []); + + useEffect(() => { + if (!isOpen || !autoRefresh) { + return undefined; + } + + const timerId = window.setInterval(() => { + onRefresh(); + }, refreshIntervalMs); + + return () => window.clearInterval(timerId); + }, [autoRefresh, isOpen, onRefresh, refreshIntervalMs]); + + useEffect(() => { + if (!isOpen || !followTail || !logRef.current) { + return; + } + + logRef.current.scrollTop = logRef.current.scrollHeight; + }, [followTail, isOpen, logPayload?.content]); + if (!isOpen) { return null; } @@ -108,8 +134,35 @@ export default function RuntimeLogsModal({ ) : null} +
+ + +
+
-
+          }}
+          >
             {logPayload?.content || '暂无日志输出'}
           
diff --git a/frontend/src/components/StockExplainView.jsx b/frontend/src/components/StockExplainView.jsx index 644d66a..7ab5d22 100644 --- a/frontend/src/components/StockExplainView.jsx +++ b/frontend/src/components/StockExplainView.jsx @@ -33,6 +33,9 @@ export default function StockExplainView({ insiderTradesSnapshot, technicalIndicatorsSnapshot, onRequestRangeExplain, + onRequestHistory, + onRequestExplainEvents, + onRequestNews, onRequestNewsForDate, onRequestStory, onRequestInsiderTrades, @@ -142,6 +145,32 @@ export default function StockExplainView({ setActiveNewsSentiment('all'); }, [selectedSymbol, selectedEventDate]); + useEffect(() => { + if (!selectedSymbol) { + return; + } + + if (onRequestHistory && (!Array.isArray(ohlcHistoryByTicker?.[selectedSymbol]) || ohlcHistoryByTicker[selectedSymbol].length === 0)) { + onRequestHistory(selectedSymbol); + } + + if (onRequestExplainEvents && !explainEventsSnapshot) { + onRequestExplainEvents(selectedSymbol); + } + + if (onRequestNews && (!Array.isArray(newsSnapshot?.items) || newsSnapshot.items.length === 0)) { + onRequestNews(selectedSymbol); + } + }, [ + explainEventsSnapshot, + newsSnapshot, + ohlcHistoryByTicker, + onRequestExplainEvents, + onRequestHistory, + onRequestNews, + selectedSymbol, + ]); + useEffect(() => { if (!selectedSymbol || !selectedEventDate || !onRequestNewsForDate) { return; diff --git a/frontend/src/styles/GlobalStyles.jsx b/frontend/src/styles/GlobalStyles.jsx index 040b0fb..acdfd06 100644 --- a/frontend/src/styles/GlobalStyles.jsx +++ b/frontend/src/styles/GlobalStyles.jsx @@ -578,7 +578,7 @@ export default function GlobalStyles() { left: 0; right: 0; bottom: 0; - z-index: 999; + z-index: 700; } .room-scene-wrapper { @@ -680,7 +680,7 @@ export default function GlobalStyles() { line-height: 1.5; animation: bubbleAppear 0.4s cubic-bezier(0.34, 1.56, 0.64, 1); overflow: hidden; - z-index: 30; + z-index: 1500; } @keyframes bubbleAppear { @@ -713,7 +713,7 @@ export default function GlobalStyles() { right: 8px; display: flex; gap: 4px; - z-index: 10; + z-index: 1510; } .bubble-jump-btn,