diff --git a/backend/api/runtime.py b/backend/api/runtime.py index 2cf2b50..def6fad 100644 --- a/backend/api/runtime.py +++ b/backend/api/runtime.py @@ -3,22 +3,28 @@ from __future__ import annotations +import asyncio import json +from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional -from fastapi import APIRouter, HTTPException -from pydantic import BaseModel +from fastapi import APIRouter, HTTPException, BackgroundTasks +from pydantic import BaseModel, Field from backend.runtime.agent_runtime import AgentRuntimeState from backend.runtime.context import TradingRunContext -from backend.runtime.manager import TradingRuntimeManager +from backend.runtime.manager import TradingRuntimeManager, get_global_runtime_manager router = APIRouter(prefix="/api/runtime", tags=["runtime"]) runtime_manager: Optional[TradingRuntimeManager] = None PROJECT_ROOT = Path(__file__).resolve().parents[2] +# Global task reference for running pipeline +_running_task: Optional[asyncio.Task] = None +_stop_event: Optional[asyncio.Event] = None + class RunContextResponse(BaseModel): config_name: str @@ -48,6 +54,49 @@ class RuntimeEventsResponse(BaseModel): events: List[RuntimeEvent] +class LaunchConfig(BaseModel): + """Configuration for launching a new trading task.""" + tickers: List[str] = Field(default_factory=list, description="股票池") + schedule_mode: str = Field(default="daily", description="调度模式: daily, interval") + interval_minutes: int = Field(default=60, ge=1, description="间隔分钟数") + trigger_time: str = Field(default="09:30", description="触发时间 HH:MM") + max_comm_cycles: int = Field(default=2, ge=1, description="最大会商轮数") + initial_cash: float = Field(default=100000.0, gt=0, description="初始资金") + margin_requirement: float = Field(default=0.0, ge=0, description="保证金要求") + enable_memory: bool = Field(default=False, description="是否启用长期记忆") + mode: str = Field(default="live", description="运行模式: live, backtest") + start_date: Optional[str] = Field(default=None, description="回测开始日期 YYYY-MM-DD") + end_date: Optional[str] = Field(default=None, description="回测结束日期 YYYY-MM-DD") + + +class LaunchResponse(BaseModel): + run_id: str + status: str + run_dir: str + message: str + + +class StopResponse(BaseModel): + status: str + message: str + + +class RestartResponse(BaseModel): + run_id: str + status: str + message: str + + +def _generate_run_id() -> str: + """Generate timestamp-based run ID: YYYYMMDD_HHMMSS""" + return datetime.now().strftime("%Y%m%d_%H%M%S") + + +def _get_run_dir(run_id: str) -> Path: + """Return the run directory for a given run ID.""" + return PROJECT_ROOT / "runs" / run_id + + def _latest_snapshot_path() -> Optional[Path]: candidates = sorted( PROJECT_ROOT.glob("runs/*/state/runtime_state.json"), @@ -133,3 +182,214 @@ def unregister_runtime_manager() -> None: """Drop the runtime manager reference (used for shutdown/testing).""" global runtime_manager runtime_manager = None + + +async def _stop_current_runtime(force: bool = True) -> bool: + """Stop the current running runtime if exists. + + Args: + force: If True, cancel the running task immediately + + Returns: + True if a runtime was stopped, False if no runtime was running + """ + global _running_task, _stop_event + + # Signal stop + if _stop_event is not None: + _stop_event.set() + + # Cancel running task + if _running_task is not None and not _running_task.done(): + if force: + _running_task.cancel() + try: + await _running_task + except asyncio.CancelledError: + pass + else: + # Wait for graceful shutdown + try: + await asyncio.wait_for(_running_task, timeout=30.0) + except asyncio.TimeoutError: + _running_task.cancel() + try: + await _running_task + except asyncio.CancelledError: + pass + + _running_task = None + _stop_event = None + + # Unregister runtime manager + if runtime_manager is not None: + unregister_runtime_manager() + + return True + + +@router.post("/start", response_model=LaunchResponse) +async def start_runtime( + config: LaunchConfig, + background_tasks: BackgroundTasks +) -> LaunchResponse: + """Start a new trading runtime with the given configuration. + + If a runtime is already running, it will be forcefully stopped first. + Creates a new timestamped run directory. + """ + global _running_task, _stop_event, runtime_manager + + # 1. Stop current runtime if exists + await _stop_current_runtime(force=True) + + # 2. Generate run ID and directory + run_id = _generate_run_id() + run_dir = _get_run_dir(run_id) + + # 3. Prepare bootstrap config + bootstrap = { + "tickers": config.tickers, + "schedule_mode": config.schedule_mode, + "interval_minutes": config.interval_minutes, + "trigger_time": config.trigger_time, + "max_comm_cycles": config.max_comm_cycles, + "initial_cash": config.initial_cash, + "margin_requirement": config.margin_requirement, + "enable_memory": config.enable_memory, + "mode": config.mode, + "start_date": config.start_date, + "end_date": config.end_date, + } + + # 4. Create and prepare runtime manager + runtime_manager = TradingRuntimeManager( + config_name=run_id, + run_dir=run_dir, + bootstrap=bootstrap, + ) + runtime_manager.prepare_run() + set_global_runtime_manager = None # Will be set by main module + + # 5. Write BOOTSTRAP.md + _write_bootstrap_md(run_dir, bootstrap) + + # 6. Start pipeline in background + _stop_event = asyncio.Event() + _running_task = asyncio.create_task( + _run_pipeline(run_id, run_dir, bootstrap, _stop_event) + ) + + return LaunchResponse( + run_id=run_id, + status="started", + run_dir=str(run_dir), + message=f"Runtime started with run_id: {run_id}", + ) + + +@router.post("/stop", response_model=StopResponse) +async def stop_runtime(force: bool = True) -> StopResponse: + """Stop the current running runtime. + + Args: + force: If True, forcefully cancel the running task + """ + was_running = await _stop_current_runtime(force=force) + + if not was_running: + raise HTTPException(status_code=404, detail="No runtime is currently running") + + return StopResponse( + status="stopped", + message="Runtime stopped successfully", + ) + + +@router.post("/restart", response_model=RestartResponse) +async def restart_runtime( + config: LaunchConfig, + background_tasks: BackgroundTasks +) -> RestartResponse: + """Restart the runtime with a new configuration. + + Equivalent to stop + start. + """ + # Stop current runtime + await _stop_current_runtime(force=True) + + # Start new runtime + response = await start_runtime(config, background_tasks) + + return RestartResponse( + run_id=response.run_id, + status="restarted", + message=f"Runtime restarted with run_id: {response.run_id}", + ) + + +@router.get("/current") +async def get_current_runtime(): + """Get information about the currently running runtime.""" + global _running_task, runtime_manager + + is_running = _running_task is not None and not _running_task.done() + + if not is_running or runtime_manager is None: + raise HTTPException(status_code=404, detail="No runtime is currently running") + + return { + "run_id": runtime_manager.config_name, + "run_dir": str(runtime_manager.run_dir), + "is_running": is_running, + "bootstrap": runtime_manager.bootstrap, + } + + +def _write_bootstrap_md(run_dir: Path, bootstrap: Dict[str, Any]) -> None: + """Write bootstrap configuration to BOOTSTRAP.md.""" + try: + import yaml + except ImportError: + yaml = None + + bootstrap_path = run_dir / "BOOTSTRAP.md" + bootstrap_path.parent.mkdir(parents=True, exist_ok=True) + + # Filter out None values + values = {k: v for k, v in bootstrap.items() if v is not None} + + if yaml: + front_matter = yaml.safe_dump(values, allow_unicode=True, sort_keys=False) + else: + # Fallback to JSON if yaml not available + front_matter = json.dumps(values, ensure_ascii=False, indent=2) + + content = f"---\n{front_matter}---\n" + bootstrap_path.write_text(content, encoding="utf-8") + + +async def _run_pipeline( + run_id: str, + run_dir: Path, + bootstrap: Dict[str, Any], + stop_event: asyncio.Event +) -> None: + """Background task to run the trading pipeline. + + This is a placeholder - actual implementation will integrate with main.py + """ + try: + # TODO: Integrate with main.py pipeline execution + # This should call the actual pipeline startup logic from main.py + + # For now, just wait until stop event is set + while not stop_event.is_set(): + await asyncio.sleep(1) + + except asyncio.CancelledError: + # Handle cancellation gracefully + raise + finally: + # Cleanup + pass diff --git a/backend/runtime/manager.py b/backend/runtime/manager.py index ae60156..333db96 100644 --- a/backend/runtime/manager.py +++ b/backend/runtime/manager.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import json from datetime import datetime, UTC from pathlib import Path @@ -10,6 +11,7 @@ from .context import TradingRunContext from .registry import RuntimeRegistry _global_runtime_manager: Optional["TradingRuntimeManager"] = None +_shutdown_event: Optional[asyncio.Event] = None def set_global_runtime_manager(manager: "TradingRuntimeManager") -> None: @@ -26,6 +28,28 @@ def get_global_runtime_manager() -> Optional["TradingRuntimeManager"]: return _global_runtime_manager +def set_shutdown_event(event: asyncio.Event) -> None: + """Set the global shutdown event for signaling runtime stop.""" + global _shutdown_event + _shutdown_event = event + + +def clear_shutdown_event() -> None: + """Clear the global shutdown event.""" + global _shutdown_event + _shutdown_event = None + + +def get_shutdown_event() -> Optional[asyncio.Event]: + """Get the global shutdown event if set.""" + return _shutdown_event + + +def is_shutdown_requested() -> bool: + """Check if shutdown has been requested.""" + return _shutdown_event is not None and _shutdown_event.is_set() + + class TradingRuntimeManager: def __init__(self, config_name: str, run_dir: Path, bootstrap: Optional[Dict[str, Any]] = None) -> None: self.config_name = config_name diff --git a/frontend/src/App.jsx b/frontend/src/App.jsx index 5cdc1a3..893a0c4 100644 --- a/frontend/src/App.jsx +++ b/frontend/src/App.jsx @@ -5,6 +5,7 @@ import { AGENTS, INITIAL_TICKERS } from './config/constants'; // Services import { ReadOnlyClient } from './services/websocket'; +import { startRuntime } from './services/runtimeApi'; // Hooks import { useFeedProcessor } from './hooks/useFeedProcessor'; @@ -17,7 +18,6 @@ import NetValueChart from './components/NetValueChart'; import StockLogo from './components/StockLogo'; import Header from './components/Header.jsx'; import RuntimeSettingsPanel from './components/RuntimeSettingsPanel.jsx'; -import RuntimeView from './components/RuntimeView.jsx'; // Utils import { formatNumber, formatTickerPrice } from './utils/formatters'; @@ -555,7 +555,7 @@ export default function LiveTradingApp() { } }, [enableMemoryDraft, initialCashDraft, intervalMinutesDraft, marginRequirementDraft, maxCommCyclesDraft, scheduleModeDraft, triggerTimeDraft]); - const handleLaunchConfigSave = useCallback(() => { + const handleLaunchConfigSave = useCallback(async () => { const pendingTickers = parseWatchlistInput(watchlistInputValue); const nextTickers = Array.from(new Set([...watchlistDraftSymbols, ...pendingTickers])); if (nextTickers.length === 0) { @@ -563,11 +563,6 @@ export default function LiveTradingApp() { return; } - if (!clientRef.current) { - setRuntimeConfigFeedback({ type: 'error', text: '连接未就绪,稍后重试' }); - return; - } - const interval = Number(intervalMinutesDraft); const maxCommCycles = Number(maxCommCyclesDraft); const initialCash = Number(initialCashDraft); @@ -596,26 +591,35 @@ export default function LiveTradingApp() { setWatchlistDraftSymbols(nextTickers); setWatchlistInputValue(''); - const watchlistSuccess = clientRef.current.send({ - type: 'update_watchlist', - tickers: nextTickers - }); + try { + // Call API to start new runtime with timestamp-based run directory + const result = await startRuntime({ + tickers: nextTickers, + schedule_mode: scheduleModeDraft, + interval_minutes: interval, + trigger_time: triggerTimeDraft, + max_comm_cycles: maxCommCycles, + initial_cash: initialCash, + margin_requirement: marginRequirement, + enable_memory: Boolean(enableMemoryDraft), + mode: serverMode || 'live' + }); - const runtimeSuccess = clientRef.current.send({ - type: 'update_runtime_config', - schedule_mode: scheduleModeDraft, - interval_minutes: interval, - trigger_time: triggerTimeDraft, - max_comm_cycles: maxCommCycles, - initial_cash: initialCash, - margin_requirement: marginRequirement, - enable_memory: Boolean(enableMemoryDraft) - }); - - if (!watchlistSuccess || !runtimeSuccess) { setIsRuntimeConfigSaving(false); setIsWatchlistSaving(false); - setRuntimeConfigFeedback({ type: 'error', text: '发送失败,请检查连接状态' }); + setIsRuntimeSettingsOpen(false); + setRuntimeConfigFeedback({ + type: 'success', + text: `任务已启动: ${result.run_id}` + }); + addSystemMessage(`新任务已启动: ${result.run_id}`); + } catch (error) { + setIsRuntimeConfigSaving(false); + setIsWatchlistSaving(false); + setRuntimeConfigFeedback({ + type: 'error', + text: `启动失败: ${error.message}` + }); } }, [ intervalMinutesDraft, @@ -627,7 +631,9 @@ export default function LiveTradingApp() { marginRequirementDraft, enableMemoryDraft, watchlistDraftSymbols, - watchlistInputValue + watchlistInputValue, + serverMode, + addSystemMessage ]); const handleRuntimeDefaultsRestore = useCallback(() => { @@ -2476,33 +2482,8 @@ export default function LiveTradingApp() { > 统计 - - - {currentView === 'runtime' ? ( -