# -*- coding: utf-8 -*- """Manage agent-installed and run-active skill directories for each run.""" from pathlib import Path import shutil import tempfile import zipfile from threading import Lock from typing import Any, Dict, Iterable, List, Optional, Set from urllib.parse import urlparse from urllib.request import urlretrieve import yaml from backend.agents.agent_workspace import load_agent_workspace_config from backend.agents.skill_metadata import SkillMetadata, parse_skill_metadata from backend.agents.skill_loader import validate_skill from backend.config.bootstrap_config import get_bootstrap_config_for_run try: from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler, FileSystemEvent WATCHDOG_AVAILABLE = True except ImportError: WATCHDOG_AVAILABLE = False Observer = None FileSystemEventHandler = object FileSystemEvent = object # type: ignore[misc,assignment] class SkillsManager: """Sync named skills into a run-scoped active skills workspace.""" def __init__(self, project_root: Path | None = None): self.project_root = ( project_root or Path(__file__).resolve().parents[2] ) self.builtin_root = self.project_root / "backend" / "skills" / "builtin" self.customized_root = ( self.project_root / "backend" / "skills" / "customized" ) self.runs_root = self.project_root / "runs" self._lock = Lock() # Instance-level pending skill changes (thread-safe via self._lock) self._pending_skill_changes: Dict[str, Set[Path]] = {} def get_active_root(self, config_name: str) -> Path: return self.runs_root / config_name / "skills" / "active" def get_agent_skills_root(self, config_name: str, agent_id: str) -> Path: return self.get_agent_asset_dir(config_name, agent_id) / "skills" def get_agent_active_root(self, config_name: str, agent_id: str) -> Path: return self.get_agent_skills_root(config_name, agent_id) / "active" def get_agent_installed_root(self, config_name: str, agent_id: str) -> Path: return self.get_agent_skills_root(config_name, agent_id) / "installed" def get_agent_disabled_root(self, config_name: str, agent_id: str) -> Path: return self.get_agent_skills_root(config_name, agent_id) / "disabled" def get_agent_local_root(self, config_name: str, agent_id: str) -> Path: return self.get_agent_skills_root(config_name, agent_id) / "local" def get_activation_manifest_path(self, config_name: str) -> Path: return self.runs_root / config_name / "skills" / "activation.yaml" def get_agent_asset_dir(self, config_name: str, agent_id: str) -> Path: return self.runs_root / config_name / "agents" / agent_id def list_skill_catalog(self) -> List[SkillMetadata]: """Return builtin/customized skills with parsed metadata.""" catalog: Dict[str, SkillMetadata] = {} for source, root in ( ("builtin", self.builtin_root), ("customized", self.customized_root), ): if not root.exists(): continue for skill_dir in sorted(root.iterdir(), key=lambda item: item.name): if not skill_dir.is_dir(): continue if not (skill_dir / "SKILL.md").exists(): continue metadata = parse_skill_metadata(skill_dir, source=source) catalog[metadata.skill_name] = metadata return sorted(catalog.values(), key=lambda item: item.skill_name) def list_agent_skill_catalog( self, config_name: str, agent_id: str, ) -> List[SkillMetadata]: """Return shared plus agent-local skills for one agent.""" catalog = { item.skill_name: item for item in self.list_skill_catalog() } for item in self.list_agent_local_skills(config_name, agent_id): catalog[item.skill_name] = item return sorted(catalog.values(), key=lambda item: item.skill_name) def list_active_skill_metadata( self, config_name: str, agent_id: str, ) -> List[SkillMetadata]: """Return metadata for active skills synced for one agent.""" active_root = self.get_agent_active_root(config_name, agent_id) if not active_root.exists(): return [] items: List[SkillMetadata] = [] for skill_dir in sorted(active_root.iterdir(), key=lambda item: item.name): if not skill_dir.is_dir(): continue if not (skill_dir / "SKILL.md").exists(): continue items.append(parse_skill_metadata(skill_dir, source="active")) return items def list_agent_local_skills( self, config_name: str, agent_id: str, ) -> List[SkillMetadata]: """Return metadata for agent-private local skills.""" local_root = self.get_agent_local_root(config_name, agent_id) if not local_root.exists(): return [] items: List[SkillMetadata] = [] for skill_dir in sorted(local_root.iterdir(), key=lambda item: item.name): if not skill_dir.is_dir(): continue if not (skill_dir / "SKILL.md").exists(): continue items.append(parse_skill_metadata(skill_dir, source="local")) return items def load_skill_document(self, skill_name: str) -> Dict[str, object]: """Return skill metadata plus markdown body for one skill.""" source_dir = self._resolve_source_dir(skill_name) return self._load_skill_document_from_dir( source_dir, source="customized" if source_dir.parent == self.customized_root else "builtin", ) def load_agent_skill_document( self, config_name: str, agent_id: str, skill_name: str, ) -> Dict[str, object]: """Return skill metadata plus markdown body for one agent-visible skill.""" source_dir = self._resolve_agent_skill_source_dir( config_name=config_name, agent_id=agent_id, skill_name=skill_name, ) source = "local" if source_dir.parent == self.customized_root: source = "customized" elif source_dir.parent == self.builtin_root: source = "builtin" elif source_dir.parent == self.get_agent_installed_root(config_name, agent_id): source = "installed" return self._load_skill_document_from_dir(source_dir, source=source) def create_agent_local_skill( self, config_name: str, agent_id: str, skill_name: str, ) -> Path: """Create a new local skill directory with a default SKILL.md.""" normalized = _normalize_skill_name(skill_name) if not normalized: raise ValueError("Skill name is required.") local_root = self.get_agent_local_root(config_name, agent_id) local_root.mkdir(parents=True, exist_ok=True) skill_dir = local_root / normalized if skill_dir.exists(): raise FileExistsError(f"Local skill already exists: {normalized}") skill_dir.mkdir(parents=True, exist_ok=False) (skill_dir / "SKILL.md").write_text( "---\n" f"name: {normalized}\n" "description: 当用户提出与该本地技能相关的专门任务时,应使用此技能。\n" "version: 1.0.0\n" "---\n\n" f"# {normalized}\n\n" "在这里描述该交易员的专有分析流程、判断框架和可复用步骤。\n", encoding="utf-8", ) return skill_dir def install_external_skill_for_agent( self, config_name: str, agent_id: str, source: str, *, skill_name: str | None = None, activate: bool = True, ) -> Dict[str, object]: """ Install an external skill into one agent's local skill space. Supports: - local skill directory containing SKILL.md - local zip archive containing one skill directory - http(s) URL to zip archive """ source_path = self._resolve_external_source_path(source) skill_dir = self._resolve_external_skill_dir(source_path) metadata = parse_skill_metadata(skill_dir, source="external") final_name = _normalize_skill_name(skill_name or metadata.skill_name or skill_dir.name) if not final_name: raise ValueError("Could not determine skill name from external source.") target_dir = self.get_agent_local_root(config_name, agent_id) / final_name target_dir.parent.mkdir(parents=True, exist_ok=True) if target_dir.exists(): shutil.rmtree(target_dir) shutil.copytree(skill_dir, target_dir) validation = validate_skill(target_dir) if not validation.get("valid", False): shutil.rmtree(target_dir, ignore_errors=True) raise ValueError( "Installed skill is invalid: " + "; ".join(validation.get("errors", [])) ) if activate: self.update_agent_skill_overrides( config_name=config_name, agent_id=agent_id, enable=[final_name], ) return { "skill_name": final_name, "target_dir": str(target_dir), "activated": activate, "warnings": validation.get("warnings", []), } def update_agent_local_skill( self, config_name: str, agent_id: str, skill_name: str, content: str, ) -> Path: """Overwrite one agent-local SKILL.md.""" normalized = _normalize_skill_name(skill_name) if not normalized: raise ValueError("Skill name is required.") skill_dir = self.get_agent_local_root(config_name, agent_id) / normalized if not skill_dir.exists(): raise FileNotFoundError(f"Unknown local skill: {normalized}") (skill_dir / "SKILL.md").write_text(content, encoding="utf-8") return skill_dir def delete_agent_local_skill( self, config_name: str, agent_id: str, skill_name: str, ) -> None: """Delete one agent-local skill directory.""" normalized = _normalize_skill_name(skill_name) if not normalized: raise ValueError("Skill name is required.") skill_dir = self.get_agent_local_root(config_name, agent_id) / normalized if not skill_dir.exists(): raise FileNotFoundError(f"Unknown local skill: {normalized}") shutil.rmtree(skill_dir) def _load_skill_document_from_dir( self, source_dir: Path, *, source: str, ) -> Dict[str, object]: """Return metadata plus markdown body for one resolved skill directory.""" metadata = parse_skill_metadata( source_dir, source=source, ) skill_file = source_dir / "SKILL.md" raw = skill_file.read_text(encoding="utf-8").strip() if skill_file.exists() else "" body = raw if raw.startswith("---"): parts = raw.split("---", 2) if len(parts) >= 3: body = parts[2].strip() return { "skill_name": metadata.skill_name, "name": metadata.name, "description": metadata.description, "version": metadata.version, "tools": metadata.tools, "source": metadata.source, "content": body, } def _resolve_external_source_path(self, source: str) -> Path: """Resolve source into a local path; download URL when needed.""" parsed = urlparse(source) if parsed.scheme in {"http", "https"}: suffix = Path(parsed.path).suffix or ".zip" with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp: temp_path = Path(tmp.name) urlretrieve(source, temp_path) return temp_path return Path(source).expanduser().resolve() def _resolve_external_skill_dir(self, source_path: Path) -> Path: """Resolve external source path to a skill directory containing SKILL.md.""" if not source_path.exists(): raise FileNotFoundError(f"Source does not exist: {source_path}") if source_path.is_dir(): if (source_path / "SKILL.md").exists(): return source_path children = [ item for item in source_path.iterdir() if item.is_dir() and (item / "SKILL.md").exists() ] if len(children) == 1: return children[0] raise ValueError( "Source directory must contain SKILL.md " "or exactly one child directory containing SKILL.md." ) if source_path.suffix.lower() != ".zip": raise ValueError("External source file must be a .zip archive.") temp_root = Path(tempfile.mkdtemp(prefix="external_skill_")) with zipfile.ZipFile(source_path, "r") as archive: archive.extractall(temp_root) candidates = [ item.parent for item in temp_root.rglob("SKILL.md") if item.is_file() ] unique = [] for item in candidates: if item not in unique: unique.append(item) if len(unique) != 1: raise ValueError( "Zip archive must contain exactly one skill directory with SKILL.md." ) return unique[0] def update_agent_skill_overrides( self, config_name: str, agent_id: str, *, enable: Iterable[str] | None = None, disable: Iterable[str] | None = None, ) -> Dict[str, List[str]]: """Persist per-agent enabled/disabled skill overrides in agent.yaml.""" asset_dir = self.get_agent_asset_dir(config_name, agent_id) asset_dir.mkdir(parents=True, exist_ok=True) config_path = asset_dir / "agent.yaml" current = load_agent_workspace_config(config_path) values = dict(current.values) enabled = _dedupe_preserve_order(current.enabled_skills) disabled_set = set(current.disabled_skills) for skill_name in enable or []: if skill_name not in enabled: enabled.append(skill_name) disabled_set.discard(skill_name) for skill_name in disable or []: disabled_set.add(skill_name) enabled = [item for item in enabled if item != skill_name] values["enabled_skills"] = enabled values["disabled_skills"] = sorted(disabled_set) config_path.write_text( yaml.safe_dump(values, allow_unicode=True, sort_keys=False), encoding="utf-8", ) return { "enabled_skills": enabled, "disabled_skills": sorted(disabled_set), } def forget_agent_skill_overrides( self, config_name: str, agent_id: str, skill_names: Iterable[str], ) -> Dict[str, List[str]]: """Remove skills from both enabled/disabled overrides in agent.yaml.""" asset_dir = self.get_agent_asset_dir(config_name, agent_id) asset_dir.mkdir(parents=True, exist_ok=True) config_path = asset_dir / "agent.yaml" current = load_agent_workspace_config(config_path) values = dict(current.values) removed = set(skill_names) enabled = [item for item in current.enabled_skills if item not in removed] disabled = [item for item in current.disabled_skills if item not in removed] values["enabled_skills"] = enabled values["disabled_skills"] = disabled config_path.write_text( yaml.safe_dump(values, allow_unicode=True, sort_keys=False), encoding="utf-8", ) return { "enabled_skills": enabled, "disabled_skills": disabled, } def ensure_activation_manifest(self, config_name: str) -> Path: manifest_path = self.get_activation_manifest_path(config_name) manifest_path.parent.mkdir(parents=True, exist_ok=True) if not manifest_path.exists(): manifest_path.write_text( "global_enabled_skills: []\n" "global_disabled_skills: []\n" "agent_enabled_skills: {}\n" "agent_disabled_skills: {}\n", encoding="utf-8", ) return manifest_path def load_activation_manifest(self, config_name: str) -> Dict[str, object]: manifest_path = self.ensure_activation_manifest(config_name) with open(manifest_path, "r", encoding="utf-8") as file: parsed = yaml.safe_load(file) or {} return parsed if isinstance(parsed, dict) else {} def _resolve_source_dir(self, skill_name: str) -> Path: customized_dir = self.customized_root / skill_name if customized_dir.exists(): return customized_dir builtin_dir = self.builtin_root / skill_name if builtin_dir.exists(): return builtin_dir raise FileNotFoundError(f"Unknown skill: {skill_name}") def _resolve_agent_skill_source_dir( self, config_name: str, agent_id: str, skill_name: str, ) -> Path: """Resolve one skill from the agent-local workspace or shared registry.""" for root in ( self.get_agent_local_root(config_name, agent_id), self.get_agent_installed_root(config_name, agent_id), ): candidate = root / skill_name if candidate.exists() and (candidate / "SKILL.md").exists(): return candidate return self._resolve_source_dir(skill_name) def _skill_exists_for_agent( self, config_name: str, agent_id: str, skill_name: str, ) -> bool: try: self._resolve_agent_skill_source_dir(config_name, agent_id, skill_name) except FileNotFoundError: return False return True def _persist_runtime_edits( self, config_name: str, skill_name: str, active_dir: Path, ) -> None: """ Persist run-time edits from active skills into customized skills. This keeps active skill experiments from being lost on the next reload while still allowing the active directory to be re-synced cleanly. """ if not active_dir.exists(): return source_dir = self._resolve_source_dir(skill_name) if active_dir.resolve() == source_dir.resolve(): return if not self._directories_match(active_dir, source_dir): customized_dir = self.customized_root / skill_name customized_dir.parent.mkdir(parents=True, exist_ok=True) if customized_dir.exists(): shutil.rmtree(customized_dir) shutil.copytree(active_dir, customized_dir) @staticmethod def _directories_match(left: Path, right: Path) -> bool: """Compare two directory trees by file contents.""" if not left.exists() or not right.exists(): return False left_items = sorted( path.relative_to(left) for path in left.rglob("*") ) right_items = sorted( path.relative_to(right) for path in right.rglob("*") ) if left_items != right_items: return False for relative_path in left_items: left_path = left / relative_path right_path = right / relative_path if left_path.is_dir() != right_path.is_dir(): return False if left_path.is_file(): if left_path.read_bytes() != right_path.read_bytes(): return False return True def resolve_agent_skill_names( self, config_name: str, agent_id: str, default_skills: Iterable[str], ) -> List[str]: """Resolve final skill names after bootstrap and activation overlays.""" bootstrap = get_bootstrap_config_for_run(self.project_root, config_name) override = bootstrap.agent_override(agent_id) skills = list(override.get("skills", list(default_skills))) agent_config = load_agent_workspace_config( self.get_agent_asset_dir(config_name, agent_id) / "agent.yaml", ) for skill_name in agent_config.enabled_skills: if skill_name not in skills: skills.append(skill_name) manifest = self.load_activation_manifest(config_name) for skill_name in manifest.get("global_enabled_skills", []): if skill_name not in skills: skills.append(skill_name) for skill_name in manifest.get("agent_enabled_skills", {}).get(agent_id, []): if skill_name not in skills: skills.append(skill_name) disabled = set(manifest.get("global_disabled_skills", [])) disabled.update( manifest.get("agent_disabled_skills", {}).get(agent_id, []), ) disabled.update(agent_config.disabled_skills) for item in self.list_agent_local_skills(config_name, agent_id): if item.skill_name not in skills: skills.append(item.skill_name) return [ skill for skill in skills if skill not in disabled and self._skill_exists_for_agent(config_name, agent_id, skill) ] def sync_skill_dirs( self, target_root: Path, skill_sources: Dict[str, Path], ) -> List[Path]: """Sync selected skill directories into one target root.""" target_root.mkdir(parents=True, exist_ok=True) synced_paths: List[Path] = [] wanted = set(skill_sources) for existing in target_root.iterdir(): if existing.is_dir() and existing.name not in wanted: shutil.rmtree(existing) for skill_name, source_dir in skill_sources.items(): target_dir = target_root / skill_name if target_dir.exists(): shutil.rmtree(target_dir) shutil.copytree(source_dir, target_dir) synced_paths.append(target_dir) return synced_paths def sync_active_skills( self, target_root: Path, skill_names: Iterable[str], ) -> List[Path]: """Sync selected shared skills into one active directory.""" skill_sources = { skill_name: self._resolve_source_dir(skill_name) for skill_name in skill_names } return self.sync_skill_dirs(target_root, skill_sources) def prepare_active_skills( self, config_name: str, agent_defaults: Dict[str, Iterable[str]], auto_reload: bool = False, ) -> Dict[str, List[Path]]: """Resolve all agent skills into per-agent installed/active workspaces.""" resolved: Dict[str, List[str]] = {} union: List[str] = [] for agent_id, default_skills in agent_defaults.items(): resolved_skills = self.resolve_agent_skill_names( config_name=config_name, agent_id=agent_id, default_skills=default_skills, ) resolved[agent_id] = resolved_skills for skill_name in resolved_skills: if skill_name not in union: union.append(skill_name) # Maintain the legacy union directory for compatibility/debugging. # Agent-local skills remain private to the agent workspace. self.sync_active_skills( target_root=self.get_active_root(config_name), skill_names=[ skill_name for skill_name in union if self._is_shared_skill(skill_name) ], ) active_map: Dict[str, List[Path]] = {} for agent_id, skill_names in resolved.items(): installed_sources = { skill_name: self._resolve_source_dir(skill_name) for skill_name in skill_names if (self.get_agent_local_root(config_name, agent_id) / skill_name).exists() is False } installed_paths = self.sync_skill_dirs( target_root=self.get_agent_installed_root(config_name, agent_id), skill_sources=installed_sources, ) local_root = self.get_agent_local_root(config_name, agent_id) local_sources = { skill_name: local_root / skill_name for skill_name in skill_names if (local_root / skill_name).exists() } active_sources = { path.name: path for path in installed_paths } active_sources.update(local_sources) active_map[agent_id] = self.sync_skill_dirs( target_root=self.get_agent_active_root(config_name, agent_id), skill_sources=active_sources, ) disabled_names = _dedupe_preserve_order( self._resolve_disabled_skill_names( config_name=config_name, agent_id=agent_id, default_skills=agent_defaults.get(agent_id, []), ), ) disabled_sources = { skill_name: self._resolve_agent_skill_source_dir( config_name=config_name, agent_id=agent_id, skill_name=skill_name, ) for skill_name in disabled_names } self.sync_skill_dirs( target_root=self.get_agent_disabled_root(config_name, agent_id), skill_sources=disabled_sources, ) if auto_reload: self.watch_active_skills(config_name, agent_defaults) return active_map def _is_shared_skill(self, skill_name: str) -> bool: try: self._resolve_source_dir(skill_name) except FileNotFoundError: return False return True def watch_active_skills( self, config_name: str, agent_defaults: Dict[str, Iterable[str]], callback: Optional[Any] = None, ) -> "_SkillsWatcher": """Start file system monitoring on active skill directories. Args: config_name: Run configuration name. agent_defaults: Map of agent_id -> default skill names. callback: Optional callable invoked on file changes with (changed_paths: List[Path]). Returns: A _SkillsWatcher instance. Call .stop() to halt monitoring. """ if not WATCHDOG_AVAILABLE: raise ImportError( "watchdog is required for watch_active_skills. " "Install it with: pip install watchdog" ) watched_paths: List[Path] = [] for agent_id in agent_defaults: active_root = self.get_agent_active_root(config_name, agent_id) if active_root.exists(): watched_paths.append(active_root) local_root = self.get_agent_local_root(config_name, agent_id) if local_root.exists(): watched_paths.append(local_root) handler = _SkillsChangeHandler(watched_paths, self._pending_skill_changes, callback, self._lock) observer = Observer() for path in watched_paths: observer.schedule(handler, str(path), recursive=True) observer.start() return _SkillsWatcher(observer, handler) def reload_skills_if_changed( self, config_name: str, agent_defaults: Dict[str, Iterable[str]], ) -> Dict[str, List[Path]]: """Check for file changes and reload active skills if needed. Args: config_name: Run configuration name. agent_defaults: Map of agent_id -> default skill names. Returns: Map of agent_id -> list of reloaded skill paths, or empty dict if no changes were detected. """ with self._lock: changed = self._pending_skill_changes.get(config_name) if not changed: return {} self._pending_skill_changes[config_name] = set() return self.prepare_active_skills(config_name, agent_defaults) # ------------------------------------------------------------------------- # Internal change-tracking state (populated by _SkillsChangeHandler) # ------------------------------------------------------------------------- # Legacy class-level reference kept for migration compatibility _pending_skill_changes: Dict[str, Set[Path]] = {} def _resolve_disabled_skill_names( self, config_name: str, agent_id: str, default_skills: Iterable[str], ) -> List[str]: """Resolve explicit disabled skills for one agent.""" bootstrap = get_bootstrap_config_for_run(self.project_root, config_name) override = bootstrap.agent_override(agent_id) baseline = list(override.get("skills", list(default_skills))) agent_config = load_agent_workspace_config( self.get_agent_asset_dir(config_name, agent_id) / "agent.yaml", ) manifest = self.load_activation_manifest(config_name) disabled = list(manifest.get("global_disabled_skills", [])) disabled.extend(manifest.get("agent_disabled_skills", {}).get(agent_id, [])) disabled.extend(agent_config.disabled_skills) for skill_name in baseline: if skill_name in agent_config.disabled_skills and skill_name not in disabled: disabled.append(skill_name) for item in self.list_agent_local_skills(config_name, agent_id): if item.skill_name in agent_config.disabled_skills and item.skill_name not in disabled: disabled.append(item.skill_name) return [ skill for skill in disabled if self._skill_exists_for_agent(config_name, agent_id, skill) ] class _SkillsWatcher: """Handle returned by watch_active_skills; call .stop() to halt monitoring.""" def __init__(self, observer: Observer, handler: "_SkillsChangeHandler") -> None: self._observer = observer self._handler = handler def stop(self) -> None: """Stop the file system observer.""" self._observer.stop() self._observer.join() class _SkillsChangeHandler(FileSystemEventHandler): """Collects file-change events on skill directories.""" def __init__( self, watched_paths: List[Path], pending_changes: Dict[str, Set[Path]], callback: Optional[Any] = None, lock: Optional[Lock] = None, ) -> None: super().__init__() self._watched_paths = watched_paths self._pending_changes = pending_changes self._callback = callback self._lock = lock def on_any_event(self, event: FileSystemEvent) -> None: if event.is_directory: return src_path = Path(event.src_path) for watched in self._watched_paths: if src_path.is_relative_to(watched): run_id = self._run_id_from_path(src_path) if self._lock: with self._lock: self._pending_changes.setdefault(run_id, set()).add(src_path) else: self._pending_changes.setdefault(run_id, set()).add(src_path) if self._callback: self._callback([src_path]) break @staticmethod def _run_id_from_path(path: Path) -> str: """Infer config_name from a path like runs/{config_name}/skills/active/...""" parts = path.parts for i, part in enumerate(parts): if part == "runs" and i + 1 < len(parts): return parts[i + 1] return "default" def _dedupe_preserve_order(items: Iterable[str]) -> List[str]: result: List[str] = [] for item in items: if item not in result: result.append(item) return result def _normalize_skill_name(raw_name: str) -> str: normalized = str(raw_name or "").strip().lower().replace(" ", "_").replace("-", "_") allowed = [ch for ch in normalized if ch.isalnum() or ch == "_"] return "".join(allowed).strip("_")