# -*- coding: utf-8 -*- """Run-scoped team pipeline configuration helpers.""" from __future__ import annotations from pathlib import Path from typing import Iterable, List, Dict, Any import yaml DEFAULT_FILENAME = "TEAM_PIPELINE.yaml" def team_pipeline_path(project_root: Path, config_name: str) -> Path: """Return run-scoped team pipeline config path.""" return project_root / "runs" / config_name / DEFAULT_FILENAME def ensure_team_pipeline_config( project_root: Path, config_name: str, default_analysts: Iterable[str], ) -> Path: """Ensure TEAM_PIPELINE.yaml exists for one run.""" path = team_pipeline_path(project_root, config_name) path.parent.mkdir(parents=True, exist_ok=True) if path.exists(): return path payload = { "version": 1, "controller_agent": "portfolio_manager", "discussion": { "allow_dynamic_team_update": True, "active_analysts": list(default_analysts), }, "decision": { "require_risk_manager": True, }, } path.write_text( yaml.safe_dump(payload, allow_unicode=True, sort_keys=False), encoding="utf-8", ) return path def load_team_pipeline_config(project_root: Path, config_name: str) -> Dict[str, Any]: """Load TEAM_PIPELINE.yaml and return parsed dict.""" path = team_pipeline_path(project_root, config_name) if not path.exists(): return {} parsed = yaml.safe_load(path.read_text(encoding="utf-8")) or {} return parsed if isinstance(parsed, dict) else {} def save_team_pipeline_config( project_root: Path, config_name: str, config: Dict[str, Any], ) -> Path: """Persist TEAM_PIPELINE.yaml.""" path = team_pipeline_path(project_root, config_name) path.parent.mkdir(parents=True, exist_ok=True) path.write_text( yaml.safe_dump(config, allow_unicode=True, sort_keys=False), encoding="utf-8", ) return path def resolve_active_analysts( project_root: Path, config_name: str, available_analysts: Iterable[str], ) -> List[str]: """Resolve active analysts from TEAM_PIPELINE.yaml.""" available = [item for item in available_analysts] parsed = load_team_pipeline_config(project_root, config_name) discussion = parsed.get("discussion", {}) if isinstance(parsed, dict) else {} configured = discussion.get("active_analysts", []) if not isinstance(configured, list) or not configured: return available active = [item for item in configured if item in available] return active or available def update_active_analysts( project_root: Path, config_name: str, available_analysts: Iterable[str], *, add: Iterable[str] | None = None, remove: Iterable[str] | None = None, set_to: Iterable[str] | None = None, ) -> List[str]: """Update active analysts and persist TEAM_PIPELINE.yaml.""" available = [item for item in available_analysts] ensure_team_pipeline_config(project_root, config_name, available) parsed = load_team_pipeline_config(project_root, config_name) discussion = parsed.setdefault("discussion", {}) if not isinstance(discussion, dict): discussion = {} parsed["discussion"] = discussion current = discussion.get("active_analysts", []) if not isinstance(current, list): current = [] current = [item for item in current if item in available] if not current: current = list(available) if set_to is not None: target = [item for item in set_to if item in available] current = target or current for item in add or []: if item in available and item not in current: current.append(item) for item in remove or []: current = [existing for existing in current if existing != item] if not current: current = [available[0]] if available else [] discussion["active_analysts"] = current save_team_pipeline_config(project_root, config_name, parsed) return current