# -*- coding: utf-8 -*- """Manage agent-installed and run-active skill directories for each run.""" from pathlib import Path import shutil from typing import Dict, Iterable, List import yaml from backend.agents.agent_workspace import load_agent_workspace_config from backend.agents.skill_metadata import SkillMetadata, parse_skill_metadata from backend.config.bootstrap_config import get_bootstrap_config_for_run class SkillsManager: """Sync named skills into a run-scoped active skills workspace.""" def __init__(self, project_root: Path | None = None): self.project_root = ( project_root or Path(__file__).resolve().parents[2] ) self.builtin_root = self.project_root / "backend" / "skills" / "builtin" self.customized_root = ( self.project_root / "backend" / "skills" / "customized" ) self.runs_root = self.project_root / "runs" def get_active_root(self, config_name: str) -> Path: return self.runs_root / config_name / "skills" / "active" def get_agent_skills_root(self, config_name: str, agent_id: str) -> Path: return self.get_agent_asset_dir(config_name, agent_id) / "skills" def get_agent_active_root(self, config_name: str, agent_id: str) -> Path: return self.get_agent_skills_root(config_name, agent_id) / "active" def get_agent_installed_root(self, config_name: str, agent_id: str) -> Path: return self.get_agent_skills_root(config_name, agent_id) / "installed" def get_agent_disabled_root(self, config_name: str, agent_id: str) -> Path: return self.get_agent_skills_root(config_name, agent_id) / "disabled" def get_agent_local_root(self, config_name: str, agent_id: str) -> Path: return self.get_agent_skills_root(config_name, agent_id) / "local" def get_activation_manifest_path(self, config_name: str) -> Path: return self.runs_root / config_name / "skills" / "activation.yaml" def get_agent_asset_dir(self, config_name: str, agent_id: str) -> Path: return self.runs_root / config_name / "agents" / agent_id def list_skill_catalog(self) -> List[SkillMetadata]: """Return builtin/customized skills with parsed metadata.""" catalog: Dict[str, SkillMetadata] = {} for source, root in ( ("builtin", self.builtin_root), ("customized", self.customized_root), ): if not root.exists(): continue for skill_dir in sorted(root.iterdir(), key=lambda item: item.name): if not skill_dir.is_dir(): continue if not (skill_dir / "SKILL.md").exists(): continue metadata = parse_skill_metadata(skill_dir, source=source) catalog[metadata.skill_name] = metadata return sorted(catalog.values(), key=lambda item: item.skill_name) def list_agent_skill_catalog( self, config_name: str, agent_id: str, ) -> List[SkillMetadata]: """Return shared plus agent-local skills for one agent.""" catalog = { item.skill_name: item for item in self.list_skill_catalog() } for item in self.list_agent_local_skills(config_name, agent_id): catalog[item.skill_name] = item return sorted(catalog.values(), key=lambda item: item.skill_name) def list_active_skill_metadata( self, config_name: str, agent_id: str, ) -> List[SkillMetadata]: """Return metadata for active skills synced for one agent.""" active_root = self.get_agent_active_root(config_name, agent_id) if not active_root.exists(): return [] items: List[SkillMetadata] = [] for skill_dir in sorted(active_root.iterdir(), key=lambda item: item.name): if not skill_dir.is_dir(): continue if not (skill_dir / "SKILL.md").exists(): continue items.append(parse_skill_metadata(skill_dir, source="active")) return items def list_agent_local_skills( self, config_name: str, agent_id: str, ) -> List[SkillMetadata]: """Return metadata for agent-private local skills.""" local_root = self.get_agent_local_root(config_name, agent_id) if not local_root.exists(): return [] items: List[SkillMetadata] = [] for skill_dir in sorted(local_root.iterdir(), key=lambda item: item.name): if not skill_dir.is_dir(): continue if not (skill_dir / "SKILL.md").exists(): continue items.append(parse_skill_metadata(skill_dir, source="local")) return items def load_skill_document(self, skill_name: str) -> Dict[str, object]: """Return skill metadata plus markdown body for one skill.""" source_dir = self._resolve_source_dir(skill_name) return self._load_skill_document_from_dir( source_dir, source="customized" if source_dir.parent == self.customized_root else "builtin", ) def load_agent_skill_document( self, config_name: str, agent_id: str, skill_name: str, ) -> Dict[str, object]: """Return skill metadata plus markdown body for one agent-visible skill.""" source_dir = self._resolve_agent_skill_source_dir( config_name=config_name, agent_id=agent_id, skill_name=skill_name, ) source = "local" if source_dir.parent == self.customized_root: source = "customized" elif source_dir.parent == self.builtin_root: source = "builtin" elif source_dir.parent == self.get_agent_installed_root(config_name, agent_id): source = "installed" return self._load_skill_document_from_dir(source_dir, source=source) def create_agent_local_skill( self, config_name: str, agent_id: str, skill_name: str, ) -> Path: """Create a new local skill directory with a default SKILL.md.""" normalized = _normalize_skill_name(skill_name) if not normalized: raise ValueError("Skill name is required.") local_root = self.get_agent_local_root(config_name, agent_id) local_root.mkdir(parents=True, exist_ok=True) skill_dir = local_root / normalized if skill_dir.exists(): raise FileExistsError(f"Local skill already exists: {normalized}") skill_dir.mkdir(parents=True, exist_ok=False) (skill_dir / "SKILL.md").write_text( "---\n" f"name: {normalized}\n" "description: 当用户提出与该本地技能相关的专门任务时,应使用此技能。\n" "version: 1.0.0\n" "---\n\n" f"# {normalized}\n\n" "在这里描述该交易员的专有分析流程、判断框架和可复用步骤。\n", encoding="utf-8", ) return skill_dir def update_agent_local_skill( self, config_name: str, agent_id: str, skill_name: str, content: str, ) -> Path: """Overwrite one agent-local SKILL.md.""" normalized = _normalize_skill_name(skill_name) if not normalized: raise ValueError("Skill name is required.") skill_dir = self.get_agent_local_root(config_name, agent_id) / normalized if not skill_dir.exists(): raise FileNotFoundError(f"Unknown local skill: {normalized}") (skill_dir / "SKILL.md").write_text(content, encoding="utf-8") return skill_dir def delete_agent_local_skill( self, config_name: str, agent_id: str, skill_name: str, ) -> None: """Delete one agent-local skill directory.""" normalized = _normalize_skill_name(skill_name) if not normalized: raise ValueError("Skill name is required.") skill_dir = self.get_agent_local_root(config_name, agent_id) / normalized if not skill_dir.exists(): raise FileNotFoundError(f"Unknown local skill: {normalized}") shutil.rmtree(skill_dir) def _load_skill_document_from_dir( self, source_dir: Path, *, source: str, ) -> Dict[str, object]: """Return metadata plus markdown body for one resolved skill directory.""" metadata = parse_skill_metadata( source_dir, source=source, ) skill_file = source_dir / "SKILL.md" raw = skill_file.read_text(encoding="utf-8").strip() if skill_file.exists() else "" body = raw if raw.startswith("---"): parts = raw.split("---", 2) if len(parts) >= 3: body = parts[2].strip() return { "skill_name": metadata.skill_name, "name": metadata.name, "description": metadata.description, "version": metadata.version, "tools": metadata.tools, "source": metadata.source, "content": body, } def update_agent_skill_overrides( self, config_name: str, agent_id: str, *, enable: Iterable[str] | None = None, disable: Iterable[str] | None = None, ) -> Dict[str, List[str]]: """Persist per-agent enabled/disabled skill overrides in agent.yaml.""" asset_dir = self.get_agent_asset_dir(config_name, agent_id) asset_dir.mkdir(parents=True, exist_ok=True) config_path = asset_dir / "agent.yaml" current = load_agent_workspace_config(config_path) values = dict(current.values) enabled = _dedupe_preserve_order(current.enabled_skills) disabled_set = set(current.disabled_skills) for skill_name in enable or []: if skill_name not in enabled: enabled.append(skill_name) disabled_set.discard(skill_name) for skill_name in disable or []: disabled_set.add(skill_name) enabled = [item for item in enabled if item != skill_name] values["enabled_skills"] = enabled values["disabled_skills"] = sorted(disabled_set) config_path.write_text( yaml.safe_dump(values, allow_unicode=True, sort_keys=False), encoding="utf-8", ) return { "enabled_skills": enabled, "disabled_skills": sorted(disabled_set), } def forget_agent_skill_overrides( self, config_name: str, agent_id: str, skill_names: Iterable[str], ) -> Dict[str, List[str]]: """Remove skills from both enabled/disabled overrides in agent.yaml.""" asset_dir = self.get_agent_asset_dir(config_name, agent_id) asset_dir.mkdir(parents=True, exist_ok=True) config_path = asset_dir / "agent.yaml" current = load_agent_workspace_config(config_path) values = dict(current.values) removed = set(skill_names) enabled = [item for item in current.enabled_skills if item not in removed] disabled = [item for item in current.disabled_skills if item not in removed] values["enabled_skills"] = enabled values["disabled_skills"] = disabled config_path.write_text( yaml.safe_dump(values, allow_unicode=True, sort_keys=False), encoding="utf-8", ) return { "enabled_skills": enabled, "disabled_skills": disabled, } def ensure_activation_manifest(self, config_name: str) -> Path: manifest_path = self.get_activation_manifest_path(config_name) manifest_path.parent.mkdir(parents=True, exist_ok=True) if not manifest_path.exists(): manifest_path.write_text( "global_enabled_skills: []\n" "global_disabled_skills: []\n" "agent_enabled_skills: {}\n" "agent_disabled_skills: {}\n", encoding="utf-8", ) return manifest_path def load_activation_manifest(self, config_name: str) -> Dict[str, object]: manifest_path = self.ensure_activation_manifest(config_name) with open(manifest_path, "r", encoding="utf-8") as file: parsed = yaml.safe_load(file) or {} return parsed if isinstance(parsed, dict) else {} def _resolve_source_dir(self, skill_name: str) -> Path: customized_dir = self.customized_root / skill_name if customized_dir.exists(): return customized_dir builtin_dir = self.builtin_root / skill_name if builtin_dir.exists(): return builtin_dir raise FileNotFoundError(f"Unknown skill: {skill_name}") def _resolve_agent_skill_source_dir( self, config_name: str, agent_id: str, skill_name: str, ) -> Path: """Resolve one skill from the agent-local workspace or shared registry.""" for root in ( self.get_agent_local_root(config_name, agent_id), self.get_agent_installed_root(config_name, agent_id), ): candidate = root / skill_name if candidate.exists() and (candidate / "SKILL.md").exists(): return candidate return self._resolve_source_dir(skill_name) def _skill_exists_for_agent( self, config_name: str, agent_id: str, skill_name: str, ) -> bool: try: self._resolve_agent_skill_source_dir(config_name, agent_id, skill_name) except FileNotFoundError: return False return True def _persist_runtime_edits( self, config_name: str, skill_name: str, active_dir: Path, ) -> None: """ Persist run-time edits from active skills into customized skills. This keeps active skill experiments from being lost on the next reload while still allowing the active directory to be re-synced cleanly. """ if not active_dir.exists(): return source_dir = self._resolve_source_dir(skill_name) if active_dir.resolve() == source_dir.resolve(): return if not self._directories_match(active_dir, source_dir): customized_dir = self.customized_root / skill_name customized_dir.parent.mkdir(parents=True, exist_ok=True) if customized_dir.exists(): shutil.rmtree(customized_dir) shutil.copytree(active_dir, customized_dir) @staticmethod def _directories_match(left: Path, right: Path) -> bool: """Compare two directory trees by file contents.""" if not left.exists() or not right.exists(): return False left_items = sorted( path.relative_to(left) for path in left.rglob("*") ) right_items = sorted( path.relative_to(right) for path in right.rglob("*") ) if left_items != right_items: return False for relative_path in left_items: left_path = left / relative_path right_path = right / relative_path if left_path.is_dir() != right_path.is_dir(): return False if left_path.is_file(): if left_path.read_bytes() != right_path.read_bytes(): return False return True def resolve_agent_skill_names( self, config_name: str, agent_id: str, default_skills: Iterable[str], ) -> List[str]: """Resolve final skill names after bootstrap and activation overlays.""" bootstrap = get_bootstrap_config_for_run(self.project_root, config_name) override = bootstrap.agent_override(agent_id) skills = list(override.get("skills", list(default_skills))) agent_config = load_agent_workspace_config( self.get_agent_asset_dir(config_name, agent_id) / "agent.yaml", ) for skill_name in agent_config.enabled_skills: if skill_name not in skills: skills.append(skill_name) manifest = self.load_activation_manifest(config_name) for skill_name in manifest.get("global_enabled_skills", []): if skill_name not in skills: skills.append(skill_name) for skill_name in manifest.get("agent_enabled_skills", {}).get(agent_id, []): if skill_name not in skills: skills.append(skill_name) disabled = set(manifest.get("global_disabled_skills", [])) disabled.update( manifest.get("agent_disabled_skills", {}).get(agent_id, []), ) disabled.update(agent_config.disabled_skills) for item in self.list_agent_local_skills(config_name, agent_id): if item.skill_name not in skills: skills.append(item.skill_name) return [ skill for skill in skills if skill not in disabled and self._skill_exists_for_agent(config_name, agent_id, skill) ] def sync_skill_dirs( self, target_root: Path, skill_sources: Dict[str, Path], ) -> List[Path]: """Sync selected skill directories into one target root.""" target_root.mkdir(parents=True, exist_ok=True) synced_paths: List[Path] = [] wanted = set(skill_sources) for existing in target_root.iterdir(): if existing.is_dir() and existing.name not in wanted: shutil.rmtree(existing) for skill_name, source_dir in skill_sources.items(): target_dir = target_root / skill_name if target_dir.exists(): shutil.rmtree(target_dir) shutil.copytree(source_dir, target_dir) synced_paths.append(target_dir) return synced_paths def sync_active_skills( self, target_root: Path, skill_names: Iterable[str], ) -> List[Path]: """Sync selected shared skills into one active directory.""" skill_sources = { skill_name: self._resolve_source_dir(skill_name) for skill_name in skill_names } return self.sync_skill_dirs(target_root, skill_sources) def prepare_active_skills( self, config_name: str, agent_defaults: Dict[str, Iterable[str]], ) -> Dict[str, List[Path]]: """Resolve all agent skills into per-agent installed/active workspaces.""" resolved: Dict[str, List[str]] = {} union: List[str] = [] for agent_id, default_skills in agent_defaults.items(): resolved_skills = self.resolve_agent_skill_names( config_name=config_name, agent_id=agent_id, default_skills=default_skills, ) resolved[agent_id] = resolved_skills for skill_name in resolved_skills: if skill_name not in union: union.append(skill_name) # Maintain the legacy union directory for compatibility/debugging. # Agent-local skills remain private to the agent workspace. self.sync_active_skills( target_root=self.get_active_root(config_name), skill_names=[ skill_name for skill_name in union if self._is_shared_skill(skill_name) ], ) active_map: Dict[str, List[Path]] = {} for agent_id, skill_names in resolved.items(): installed_sources = { skill_name: self._resolve_source_dir(skill_name) for skill_name in skill_names if (self.get_agent_local_root(config_name, agent_id) / skill_name).exists() is False } installed_paths = self.sync_skill_dirs( target_root=self.get_agent_installed_root(config_name, agent_id), skill_sources=installed_sources, ) local_root = self.get_agent_local_root(config_name, agent_id) local_sources = { skill_name: local_root / skill_name for skill_name in skill_names if (local_root / skill_name).exists() } active_sources = { path.name: path for path in installed_paths } active_sources.update(local_sources) active_map[agent_id] = self.sync_skill_dirs( target_root=self.get_agent_active_root(config_name, agent_id), skill_sources=active_sources, ) disabled_names = _dedupe_preserve_order( self._resolve_disabled_skill_names( config_name=config_name, agent_id=agent_id, default_skills=agent_defaults.get(agent_id, []), ), ) disabled_sources = { skill_name: self._resolve_agent_skill_source_dir( config_name=config_name, agent_id=agent_id, skill_name=skill_name, ) for skill_name in disabled_names } self.sync_skill_dirs( target_root=self.get_agent_disabled_root(config_name, agent_id), skill_sources=disabled_sources, ) return active_map def _is_shared_skill(self, skill_name: str) -> bool: try: self._resolve_source_dir(skill_name) except FileNotFoundError: return False return True def _resolve_disabled_skill_names( self, config_name: str, agent_id: str, default_skills: Iterable[str], ) -> List[str]: """Resolve explicit disabled skills for one agent.""" bootstrap = get_bootstrap_config_for_run(self.project_root, config_name) override = bootstrap.agent_override(agent_id) baseline = list(override.get("skills", list(default_skills))) agent_config = load_agent_workspace_config( self.get_agent_asset_dir(config_name, agent_id) / "agent.yaml", ) manifest = self.load_activation_manifest(config_name) disabled = list(manifest.get("global_disabled_skills", [])) disabled.extend(manifest.get("agent_disabled_skills", {}).get(agent_id, [])) disabled.extend(agent_config.disabled_skills) for skill_name in baseline: if skill_name in agent_config.disabled_skills and skill_name not in disabled: disabled.append(skill_name) for item in self.list_agent_local_skills(config_name, agent_id): if item.skill_name in agent_config.disabled_skills and item.skill_name not in disabled: disabled.append(item.skill_name) return [ skill for skill in disabled if self._skill_exists_for_agent(config_name, agent_id, skill) ] def _dedupe_preserve_order(items: Iterable[str]) -> List[str]: result: List[str] = [] for item in items: if item not in result: result.append(item) return result def _normalize_skill_name(raw_name: str) -> str: normalized = str(raw_name or "").strip().lower().replace(" ", "_").replace("-", "_") allowed = [ch for ch in normalized if ch.isalnum() or ch == "_"] return "".join(allowed).strip("_")