Add restore-mode task launch flow

This commit is contained in:
2026-03-24 15:27:35 +08:00
parent 6413edf8c9
commit 8d6c3c5647
12 changed files with 572 additions and 52 deletions

View File

@@ -167,6 +167,8 @@ class RuntimeEventsResponse(BaseModel):
class LaunchConfig(BaseModel):
"""Configuration for launching a new trading task."""
launch_mode: str = Field(default="fresh", description="启动形式: fresh, restore")
restore_run_id: Optional[str] = Field(default=None, description="历史任务 run_id用于恢复启动")
tickers: List[str] = Field(default_factory=list, description="股票池")
schedule_mode: str = Field(default="daily", description="调度模式: daily, interval")
interval_minutes: int = Field(default=60, ge=1, description="间隔分钟数")
@@ -190,6 +192,19 @@ class LaunchResponse(BaseModel):
message: str
class RuntimeHistoryItem(BaseModel):
run_id: str
run_dir: str
updated_at: Optional[str] = None
total_trades: int = 0
total_asset_value: Optional[float] = None
bootstrap: Dict[str, Any] = Field(default_factory=dict)
class RuntimeHistoryResponse(BaseModel):
runs: List[RuntimeHistoryItem]
class StopResponse(BaseModel):
status: str
message: str
@@ -242,6 +257,96 @@ def _get_run_dir(run_id: str) -> Path:
return PROJECT_ROOT / "runs" / run_id
def _load_run_snapshot(run_id: str) -> Dict[str, Any]:
"""Load a specific run snapshot by run_id."""
snapshot_path = _get_run_dir(run_id) / "state" / "runtime_state.json"
if not snapshot_path.exists():
raise HTTPException(status_code=404, detail=f"Run snapshot not found: {run_id}")
return json.loads(snapshot_path.read_text(encoding="utf-8"))
def _copy_path_if_exists(src: Path, dst: Path) -> None:
if not src.exists():
return
if src.is_dir():
shutil.copytree(src, dst, dirs_exist_ok=True)
else:
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dst)
def _restore_run_assets(source_run_id: str, target_run_dir: Path) -> None:
"""Seed a fresh run directory from a historical run snapshot."""
source_run_dir = _get_run_dir(source_run_id)
if not source_run_dir.exists():
raise HTTPException(status_code=404, detail=f"Source run not found: {source_run_id}")
for relative in [
"team_dashboard",
"agents",
"skills",
"memory",
"state/server_state.json",
"state/runtime.db",
"state/research.db",
]:
_copy_path_if_exists(source_run_dir / relative, target_run_dir / relative)
def _list_runs(limit: int = 50) -> list[RuntimeHistoryItem]:
runs_root = PROJECT_ROOT / "runs"
if not runs_root.exists():
return []
items: list[RuntimeHistoryItem] = []
run_dirs = sorted(
[path for path in runs_root.iterdir() if path.is_dir()],
key=lambda path: path.stat().st_mtime,
reverse=True,
)
for run_dir in run_dirs[: max(1, int(limit))]:
run_id = run_dir.name
runtime_state_path = run_dir / "state" / "runtime_state.json"
summary_path = run_dir / "team_dashboard" / "summary.json"
bootstrap: Dict[str, Any] = {}
updated_at: Optional[str] = None
total_trades = 0
total_asset_value: Optional[float] = None
if runtime_state_path.exists():
try:
snapshot = json.loads(runtime_state_path.read_text(encoding="utf-8"))
context = snapshot.get("context") or {}
bootstrap = dict(context.get("bootstrap_values") or {})
updated_at = snapshot.get("events", [{}])[-1].get("timestamp") if snapshot.get("events") else None
except Exception:
bootstrap = {}
if summary_path.exists():
try:
summary = json.loads(summary_path.read_text(encoding="utf-8"))
total_trades = int(summary.get("totalTrades") or 0)
total_asset_value = float(summary.get("totalAssetValue")) if summary.get("totalAssetValue") is not None else None
except Exception:
total_trades = 0
total_asset_value = None
items.append(
RuntimeHistoryItem(
run_id=run_id,
run_dir=str(run_dir),
updated_at=updated_at,
total_trades=total_trades,
total_asset_value=total_asset_value,
bootstrap=bootstrap,
)
)
return items
def _is_timestamped_run_dir(path: Path) -> bool:
try:
datetime.strptime(path.name, "%Y%m%d_%H%M%S")
@@ -390,6 +495,12 @@ async def get_runtime_events() -> RuntimeEventsResponse:
)
@router.get("/history", response_model=RuntimeHistoryResponse)
async def get_runtime_history(limit: int = 20) -> RuntimeHistoryResponse:
"""List recent historical runs for restore/start selection."""
return RuntimeHistoryResponse(runs=_list_runs(limit=limit))
@router.get("/gateway/status", response_model=GatewayStatusResponse)
async def get_gateway_status() -> GatewayStatusResponse:
"""Get Gateway process status and port."""
@@ -609,9 +720,44 @@ async def start_runtime(
_stop_gateway()
await asyncio.sleep(1) # Wait for port release
# 2. Generate run ID and directory
run_id = _generate_run_id()
run_dir = _get_run_dir(run_id)
launch_mode = str(config.launch_mode or "fresh").strip().lower()
if launch_mode not in {"fresh", "restore"}:
raise HTTPException(status_code=400, detail="launch_mode must be 'fresh' or 'restore'")
# 2. Resolve run ID, directory, and bootstrap
if launch_mode == "restore":
restore_run_id = str(config.restore_run_id or "").strip()
if not restore_run_id:
raise HTTPException(status_code=400, detail="restore_run_id is required when launch_mode=restore")
snapshot = _load_run_snapshot(restore_run_id)
context = snapshot.get("context") or {}
if not context.get("config_name"):
raise HTTPException(status_code=404, detail=f"Run context not found: {restore_run_id}")
run_id = restore_run_id
run_dir = _get_run_dir(run_id)
bootstrap = dict(context.get("bootstrap_values") or {})
bootstrap["launch_mode"] = "restore"
bootstrap["restore_run_id"] = restore_run_id
else:
run_id = _generate_run_id()
run_dir = _get_run_dir(run_id)
bootstrap = {
"launch_mode": "fresh",
"restore_run_id": None,
"tickers": config.tickers,
"schedule_mode": config.schedule_mode,
"interval_minutes": config.interval_minutes,
"trigger_time": config.trigger_time,
"max_comm_cycles": config.max_comm_cycles,
"initial_cash": config.initial_cash,
"margin_requirement": config.margin_requirement,
"enable_memory": config.enable_memory,
"mode": config.mode,
"start_date": config.start_date,
"end_date": config.end_date,
"poll_interval": config.poll_interval,
"enable_mock": config.enable_mock,
}
retention_keep = max(1, int(os.getenv("RUNS_RETENTION_COUNT", "20") or "20"))
pruned_run_ids = _prune_old_timestamped_runs(
@@ -621,23 +767,6 @@ async def start_runtime(
if pruned_run_ids:
logger.info("Pruned old run directories: %s", ", ".join(pruned_run_ids))
# 3. Prepare bootstrap config
bootstrap = {
"tickers": config.tickers,
"schedule_mode": config.schedule_mode,
"interval_minutes": config.interval_minutes,
"trigger_time": config.trigger_time,
"max_comm_cycles": config.max_comm_cycles,
"initial_cash": config.initial_cash,
"margin_requirement": config.margin_requirement,
"enable_memory": config.enable_memory,
"mode": config.mode,
"start_date": config.start_date,
"end_date": config.end_date,
"poll_interval": config.poll_interval,
"enable_mock": config.enable_mock,
}
# 4. Create runtime manager
manager = TradingRuntimeManager(
config_name=run_id,