Add per-agent skill workspaces and TraderView management

This commit is contained in:
2026-03-17 13:55:14 +08:00
parent 1f5ee3698e
commit 2daf5717ba
35 changed files with 4774 additions and 331 deletions

View File

@@ -0,0 +1,75 @@
# -*- coding: utf-8 -*-
"""Per-agent run-scoped workspace configuration helpers."""
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
import yaml
@dataclass(frozen=True)
class AgentWorkspaceConfig:
"""Structured agent config loaded from runs/<config>/agents/<agent>/agent.yaml."""
values: Dict[str, Any] = field(default_factory=dict)
@property
def prompt_files(self) -> Optional[List[str]]:
raw = self.values.get("prompt_files")
if not isinstance(raw, list):
return None
files = [
str(item).strip()
for item in raw
if isinstance(item, str) and str(item).strip()
]
return files or None
@property
def enabled_skills(self) -> List[str]:
return _normalized_string_list(self.values.get("enabled_skills"))
@property
def disabled_skills(self) -> List[str]:
return _normalized_string_list(self.values.get("disabled_skills"))
@property
def active_tool_groups(self) -> Optional[List[str]]:
groups = _normalized_string_list(self.values.get("active_tool_groups"))
return groups or None
@property
def disabled_tool_groups(self) -> List[str]:
return _normalized_string_list(self.values.get("disabled_tool_groups"))
def get(self, key: str, default: Any = None) -> Any:
return self.values.get(key, default)
def _normalized_string_list(raw: Any) -> List[str]:
if not isinstance(raw, list):
return []
seen: List[str] = []
for item in raw:
if not isinstance(item, str):
continue
value = item.strip()
if value and value not in seen:
seen.append(value)
return seen
def load_agent_workspace_config(path: Path) -> AgentWorkspaceConfig:
"""Load agent.yaml if present."""
if not path.exists() or not path.is_file():
return AgentWorkspaceConfig()
raw = path.read_text(encoding="utf-8").strip()
if not raw:
return AgentWorkspaceConfig()
parsed = yaml.safe_load(raw) or {}
if not isinstance(parsed, dict):
parsed = {}
return AgentWorkspaceConfig(values=parsed)

View File

@@ -4,6 +4,7 @@
from pathlib import Path
from typing import Any, Optional
from .agent_workspace import load_agent_workspace_config
from backend.config.bootstrap_config import get_bootstrap_config_for_run
from .prompt_loader import PromptLoader
from .skills_manager import SkillsManager
@@ -23,6 +24,26 @@ def _append_section(parts: list[str], title: str, content: str) -> None:
parts.append(f"## {title}\n{content}")
def _build_skill_metadata_summary(skills_manager: SkillsManager, config_name: str, agent_id: str) -> str:
"""Create a compact summary of active skills for prompt routing."""
metadata_items = skills_manager.list_active_skill_metadata(config_name, agent_id)
if not metadata_items:
return ""
lines: list[str] = [
"You can use the following active skills. Prefer the most relevant one, then read its SKILL.md if needed for detailed workflow:",
]
for item in metadata_items:
parts = [f"- `{item.skill_name}`"]
if item.description:
parts.append(item.description)
if item.version:
parts.append(f"version: {item.version}")
parts.append(f"path: {item.path}")
lines.append(" | ".join(parts))
return "\n".join(lines)
def build_agent_system_prompt(
agent_id: str,
config_name: str,
@@ -31,6 +52,13 @@ def build_agent_system_prompt(
) -> str:
"""Build the final system prompt for an agent."""
sections: list[str] = []
canonical_agent_id = (
"portfolio_manager"
if "portfolio" in agent_id
else "risk_manager"
if "risk" in agent_id and not analyst_type
else agent_id
)
if analyst_type:
personas_config = _prompt_loader.load_yaml_config(
@@ -56,11 +84,21 @@ def build_agent_system_prompt(
"portfolio_manager",
"system",
)
elif canonical_agent_id == "portfolio_manager":
base_prompt = _prompt_loader.load_prompt(
"portfolio_manager",
"system",
)
elif agent_id == "risk_manager":
base_prompt = _prompt_loader.load_prompt(
"risk_manager",
"system",
)
elif canonical_agent_id == "risk_manager":
base_prompt = _prompt_loader.load_prompt(
"risk_manager",
"system",
)
else:
raise ValueError(f"Unsupported agent prompt build for: {agent_id}")
@@ -69,6 +107,7 @@ def build_agent_system_prompt(
skills_manager = SkillsManager()
asset_dir = skills_manager.get_agent_asset_dir(config_name, agent_id)
asset_dir.mkdir(parents=True, exist_ok=True)
agent_config = load_agent_workspace_config(asset_dir / "agent.yaml")
bootstrap_config = get_bootstrap_config_for_run(
skills_manager.project_root,
config_name,
@@ -80,26 +119,62 @@ def build_agent_system_prompt(
bootstrap_config.prompt_body,
)
_append_section(
sections,
"Role",
_read_file_if_exists(asset_dir / "ROLE.md"),
)
_append_section(
sections,
"Style",
_read_file_if_exists(asset_dir / "STYLE.md"),
)
_append_section(
sections,
"Policy",
_read_file_if_exists(asset_dir / "POLICY.md"),
)
prompt_files = agent_config.prompt_files or [
"SOUL.md",
"PROFILE.md",
"AGENTS.md",
"POLICY.md",
"MEMORY.md",
]
included_files = set(prompt_files)
title_map = {
"SOUL.md": "Soul",
"PROFILE.md": "Profile",
"AGENTS.md": "Agent Guide",
"POLICY.md": "Policy",
"MEMORY.md": "Memory",
"HEARTBEAT.md": "Heartbeat",
"ROLE.md": "Role",
"STYLE.md": "Style",
}
for filename in prompt_files:
_append_section(
sections,
title_map.get(filename, filename),
_read_file_if_exists(asset_dir / filename),
)
if "ROLE.md" not in included_files:
_append_section(
sections,
"Role",
_read_file_if_exists(asset_dir / "ROLE.md"),
)
if "STYLE.md" not in included_files:
_append_section(
sections,
"Style",
_read_file_if_exists(asset_dir / "STYLE.md"),
)
if "POLICY.md" not in included_files:
_append_section(
sections,
"Policy",
_read_file_if_exists(asset_dir / "POLICY.md"),
)
skill_prompt = toolkit.get_agent_skill_prompt()
if skill_prompt:
_append_section(sections, "Skills", str(skill_prompt))
metadata_summary = _build_skill_metadata_summary(
skills_manager=skills_manager,
config_name=config_name,
agent_id=agent_id,
)
if metadata_summary:
_append_section(sections, "Active Skill Catalog", metadata_summary)
activated_notes = toolkit.get_activated_notes()
if activated_notes:
_append_section(sections, "Tool Usage Notes", str(activated_notes))

View File

@@ -0,0 +1,79 @@
# -*- coding: utf-8 -*-
"""Skill metadata parsing helpers for SKILL.md files."""
from dataclasses import dataclass, field
from pathlib import Path
from typing import List
import yaml
@dataclass(frozen=True)
class SkillMetadata:
"""Parsed metadata for a skill package."""
skill_name: str
path: Path
source: str
name: str
description: str
version: str = ""
tools: List[str] = field(default_factory=list)
def parse_skill_metadata(skill_dir: Path, source: str) -> SkillMetadata:
"""Parse SKILL.md frontmatter with a forgiving schema."""
skill_name = skill_dir.name
skill_file = skill_dir / "SKILL.md"
if not skill_file.exists():
return SkillMetadata(
skill_name=skill_name,
path=skill_dir,
source=source,
name=skill_name,
description="",
)
raw = skill_file.read_text(encoding="utf-8").strip()
frontmatter = {}
body = raw
if raw.startswith("---"):
parts = raw.split("---", 2)
if len(parts) >= 3:
try:
frontmatter = yaml.safe_load(parts[1].strip()) or {}
except yaml.YAMLError:
frontmatter = {}
body = parts[2].strip()
if not isinstance(frontmatter, dict):
frontmatter = {}
description = str(frontmatter.get("description") or "").strip()
if not description and body:
description = body.splitlines()[0].strip().lstrip("#").strip()
return SkillMetadata(
skill_name=skill_name,
path=skill_dir,
source=source,
name=str(frontmatter.get("name") or skill_name).strip() or skill_name,
description=description,
version=str(frontmatter.get("version") or "").strip(),
tools=_string_list(frontmatter.get("tools")),
)
def _string_list(value) -> List[str]:
if isinstance(value, str):
item = value.strip()
return [item] if item else []
if not isinstance(value, list):
return []
seen: List[str] = []
for item in value:
if not isinstance(item, str):
continue
normalized = item.strip()
if normalized and normalized not in seen:
seen.append(normalized)
return seen

View File

@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
"""Manage builtin/customized/active skill directories for each run."""
"""Manage agent-installed and run-active skill directories for each run."""
from pathlib import Path
import shutil
@@ -7,6 +7,8 @@ from typing import Dict, Iterable, List
import yaml
from backend.agents.agent_workspace import load_agent_workspace_config
from backend.agents.skill_metadata import SkillMetadata, parse_skill_metadata
from backend.config.bootstrap_config import get_bootstrap_config_for_run
@@ -26,12 +28,283 @@ class SkillsManager:
def get_active_root(self, config_name: str) -> Path:
return self.runs_root / config_name / "skills" / "active"
def get_agent_skills_root(self, config_name: str, agent_id: str) -> Path:
return self.get_agent_asset_dir(config_name, agent_id) / "skills"
def get_agent_active_root(self, config_name: str, agent_id: str) -> Path:
return self.get_agent_skills_root(config_name, agent_id) / "active"
def get_agent_installed_root(self, config_name: str, agent_id: str) -> Path:
return self.get_agent_skills_root(config_name, agent_id) / "installed"
def get_agent_disabled_root(self, config_name: str, agent_id: str) -> Path:
return self.get_agent_skills_root(config_name, agent_id) / "disabled"
def get_agent_local_root(self, config_name: str, agent_id: str) -> Path:
return self.get_agent_skills_root(config_name, agent_id) / "local"
def get_activation_manifest_path(self, config_name: str) -> Path:
return self.runs_root / config_name / "skills" / "activation.yaml"
def get_agent_asset_dir(self, config_name: str, agent_id: str) -> Path:
return self.runs_root / config_name / "agents" / agent_id
def list_skill_catalog(self) -> List[SkillMetadata]:
"""Return builtin/customized skills with parsed metadata."""
catalog: Dict[str, SkillMetadata] = {}
for source, root in (
("builtin", self.builtin_root),
("customized", self.customized_root),
):
if not root.exists():
continue
for skill_dir in sorted(root.iterdir(), key=lambda item: item.name):
if not skill_dir.is_dir():
continue
if not (skill_dir / "SKILL.md").exists():
continue
metadata = parse_skill_metadata(skill_dir, source=source)
catalog[metadata.skill_name] = metadata
return sorted(catalog.values(), key=lambda item: item.skill_name)
def list_agent_skill_catalog(
self,
config_name: str,
agent_id: str,
) -> List[SkillMetadata]:
"""Return shared plus agent-local skills for one agent."""
catalog = {
item.skill_name: item
for item in self.list_skill_catalog()
}
for item in self.list_agent_local_skills(config_name, agent_id):
catalog[item.skill_name] = item
return sorted(catalog.values(), key=lambda item: item.skill_name)
def list_active_skill_metadata(
self,
config_name: str,
agent_id: str,
) -> List[SkillMetadata]:
"""Return metadata for active skills synced for one agent."""
active_root = self.get_agent_active_root(config_name, agent_id)
if not active_root.exists():
return []
items: List[SkillMetadata] = []
for skill_dir in sorted(active_root.iterdir(), key=lambda item: item.name):
if not skill_dir.is_dir():
continue
if not (skill_dir / "SKILL.md").exists():
continue
items.append(parse_skill_metadata(skill_dir, source="active"))
return items
def list_agent_local_skills(
self,
config_name: str,
agent_id: str,
) -> List[SkillMetadata]:
"""Return metadata for agent-private local skills."""
local_root = self.get_agent_local_root(config_name, agent_id)
if not local_root.exists():
return []
items: List[SkillMetadata] = []
for skill_dir in sorted(local_root.iterdir(), key=lambda item: item.name):
if not skill_dir.is_dir():
continue
if not (skill_dir / "SKILL.md").exists():
continue
items.append(parse_skill_metadata(skill_dir, source="local"))
return items
def load_skill_document(self, skill_name: str) -> Dict[str, object]:
"""Return skill metadata plus markdown body for one skill."""
source_dir = self._resolve_source_dir(skill_name)
return self._load_skill_document_from_dir(
source_dir,
source="customized" if source_dir.parent == self.customized_root else "builtin",
)
def load_agent_skill_document(
self,
config_name: str,
agent_id: str,
skill_name: str,
) -> Dict[str, object]:
"""Return skill metadata plus markdown body for one agent-visible skill."""
source_dir = self._resolve_agent_skill_source_dir(
config_name=config_name,
agent_id=agent_id,
skill_name=skill_name,
)
source = "local"
if source_dir.parent == self.customized_root:
source = "customized"
elif source_dir.parent == self.builtin_root:
source = "builtin"
elif source_dir.parent == self.get_agent_installed_root(config_name, agent_id):
source = "installed"
return self._load_skill_document_from_dir(source_dir, source=source)
def create_agent_local_skill(
self,
config_name: str,
agent_id: str,
skill_name: str,
) -> Path:
"""Create a new local skill directory with a default SKILL.md."""
normalized = _normalize_skill_name(skill_name)
if not normalized:
raise ValueError("Skill name is required.")
local_root = self.get_agent_local_root(config_name, agent_id)
local_root.mkdir(parents=True, exist_ok=True)
skill_dir = local_root / normalized
if skill_dir.exists():
raise FileExistsError(f"Local skill already exists: {normalized}")
skill_dir.mkdir(parents=True, exist_ok=False)
(skill_dir / "SKILL.md").write_text(
"---\n"
f"name: {normalized}\n"
"description: 当用户提出与该本地技能相关的专门任务时,应使用此技能。\n"
"version: 1.0.0\n"
"---\n\n"
f"# {normalized}\n\n"
"在这里描述该交易员的专有分析流程、判断框架和可复用步骤。\n",
encoding="utf-8",
)
return skill_dir
def update_agent_local_skill(
self,
config_name: str,
agent_id: str,
skill_name: str,
content: str,
) -> Path:
"""Overwrite one agent-local SKILL.md."""
normalized = _normalize_skill_name(skill_name)
if not normalized:
raise ValueError("Skill name is required.")
skill_dir = self.get_agent_local_root(config_name, agent_id) / normalized
if not skill_dir.exists():
raise FileNotFoundError(f"Unknown local skill: {normalized}")
(skill_dir / "SKILL.md").write_text(content, encoding="utf-8")
return skill_dir
def delete_agent_local_skill(
self,
config_name: str,
agent_id: str,
skill_name: str,
) -> None:
"""Delete one agent-local skill directory."""
normalized = _normalize_skill_name(skill_name)
if not normalized:
raise ValueError("Skill name is required.")
skill_dir = self.get_agent_local_root(config_name, agent_id) / normalized
if not skill_dir.exists():
raise FileNotFoundError(f"Unknown local skill: {normalized}")
shutil.rmtree(skill_dir)
def _load_skill_document_from_dir(
self,
source_dir: Path,
*,
source: str,
) -> Dict[str, object]:
"""Return metadata plus markdown body for one resolved skill directory."""
metadata = parse_skill_metadata(
source_dir,
source=source,
)
skill_file = source_dir / "SKILL.md"
raw = skill_file.read_text(encoding="utf-8").strip() if skill_file.exists() else ""
body = raw
if raw.startswith("---"):
parts = raw.split("---", 2)
if len(parts) >= 3:
body = parts[2].strip()
return {
"skill_name": metadata.skill_name,
"name": metadata.name,
"description": metadata.description,
"version": metadata.version,
"tools": metadata.tools,
"source": metadata.source,
"content": body,
}
def update_agent_skill_overrides(
self,
config_name: str,
agent_id: str,
*,
enable: Iterable[str] | None = None,
disable: Iterable[str] | None = None,
) -> Dict[str, List[str]]:
"""Persist per-agent enabled/disabled skill overrides in agent.yaml."""
asset_dir = self.get_agent_asset_dir(config_name, agent_id)
asset_dir.mkdir(parents=True, exist_ok=True)
config_path = asset_dir / "agent.yaml"
current = load_agent_workspace_config(config_path)
values = dict(current.values)
enabled = _dedupe_preserve_order(current.enabled_skills)
disabled_set = set(current.disabled_skills)
for skill_name in enable or []:
if skill_name not in enabled:
enabled.append(skill_name)
disabled_set.discard(skill_name)
for skill_name in disable or []:
disabled_set.add(skill_name)
enabled = [item for item in enabled if item != skill_name]
values["enabled_skills"] = enabled
values["disabled_skills"] = sorted(disabled_set)
config_path.write_text(
yaml.safe_dump(values, allow_unicode=True, sort_keys=False),
encoding="utf-8",
)
return {
"enabled_skills": enabled,
"disabled_skills": sorted(disabled_set),
}
def forget_agent_skill_overrides(
self,
config_name: str,
agent_id: str,
skill_names: Iterable[str],
) -> Dict[str, List[str]]:
"""Remove skills from both enabled/disabled overrides in agent.yaml."""
asset_dir = self.get_agent_asset_dir(config_name, agent_id)
asset_dir.mkdir(parents=True, exist_ok=True)
config_path = asset_dir / "agent.yaml"
current = load_agent_workspace_config(config_path)
values = dict(current.values)
removed = set(skill_names)
enabled = [item for item in current.enabled_skills if item not in removed]
disabled = [item for item in current.disabled_skills if item not in removed]
values["enabled_skills"] = enabled
values["disabled_skills"] = disabled
config_path.write_text(
yaml.safe_dump(values, allow_unicode=True, sort_keys=False),
encoding="utf-8",
)
return {
"enabled_skills": enabled,
"disabled_skills": disabled,
}
def ensure_activation_manifest(self, config_name: str) -> Path:
manifest_path = self.get_activation_manifest_path(config_name)
manifest_path.parent.mkdir(parents=True, exist_ok=True)
@@ -62,6 +335,34 @@ class SkillsManager:
raise FileNotFoundError(f"Unknown skill: {skill_name}")
def _resolve_agent_skill_source_dir(
self,
config_name: str,
agent_id: str,
skill_name: str,
) -> Path:
"""Resolve one skill from the agent-local workspace or shared registry."""
for root in (
self.get_agent_local_root(config_name, agent_id),
self.get_agent_installed_root(config_name, agent_id),
):
candidate = root / skill_name
if candidate.exists() and (candidate / "SKILL.md").exists():
return candidate
return self._resolve_source_dir(skill_name)
def _skill_exists_for_agent(
self,
config_name: str,
agent_id: str,
skill_name: str,
) -> bool:
try:
self._resolve_agent_skill_source_dir(config_name, agent_id, skill_name)
except FileNotFoundError:
return False
return True
def _persist_runtime_edits(
self,
config_name: str,
@@ -125,6 +426,13 @@ class SkillsManager:
bootstrap = get_bootstrap_config_for_run(self.project_root, config_name)
override = bootstrap.agent_override(agent_id)
skills = list(override.get("skills", list(default_skills)))
agent_config = load_agent_workspace_config(
self.get_agent_asset_dir(config_name, agent_id) / "agent.yaml",
)
for skill_name in agent_config.enabled_skills:
if skill_name not in skills:
skills.append(skill_name)
manifest = self.load_activation_manifest(config_name)
for skill_name in manifest.get("global_enabled_skills", []):
@@ -139,51 +447,61 @@ class SkillsManager:
disabled.update(
manifest.get("agent_disabled_skills", {}).get(agent_id, []),
)
disabled.update(agent_config.disabled_skills)
return [skill for skill in skills if skill not in disabled]
for item in self.list_agent_local_skills(config_name, agent_id):
if item.skill_name not in skills:
skills.append(item.skill_name)
def sync_active_skills(
return [
skill
for skill in skills
if skill not in disabled
and self._skill_exists_for_agent(config_name, agent_id, skill)
]
def sync_skill_dirs(
self,
config_name: str,
skill_names: Iterable[str],
target_root: Path,
skill_sources: Dict[str, Path],
) -> List[Path]:
"""Sync selected skills into the run workspace and return their paths."""
active_root = self.get_active_root(config_name)
active_root.mkdir(parents=True, exist_ok=True)
"""Sync selected skill directories into one target root."""
target_root.mkdir(parents=True, exist_ok=True)
synced_paths: List[Path] = []
wanted = set(skill_names)
wanted = set(skill_sources)
for existing in active_root.iterdir():
for existing in target_root.iterdir():
if existing.is_dir() and existing.name not in wanted:
self._persist_runtime_edits(
config_name=config_name,
skill_name=existing.name,
active_dir=existing,
)
shutil.rmtree(existing)
for skill_name in skill_names:
source_dir = self._resolve_source_dir(skill_name)
target_dir = active_root / skill_name
for skill_name, source_dir in skill_sources.items():
target_dir = target_root / skill_name
if target_dir.exists():
self._persist_runtime_edits(
config_name=config_name,
skill_name=skill_name,
active_dir=target_dir,
)
shutil.rmtree(target_dir)
shutil.copytree(source_dir, target_dir)
synced_paths.append(target_dir)
return synced_paths
def sync_active_skills(
self,
target_root: Path,
skill_names: Iterable[str],
) -> List[Path]:
"""Sync selected shared skills into one active directory."""
skill_sources = {
skill_name: self._resolve_source_dir(skill_name)
for skill_name in skill_names
}
return self.sync_skill_dirs(target_root, skill_sources)
def prepare_active_skills(
self,
config_name: str,
agent_defaults: Dict[str, Iterable[str]],
) -> Dict[str, List[Path]]:
"""Resolve all agent skills, sync the union once, and map paths per agent."""
"""Resolve all agent skills into per-agent installed/active workspaces."""
resolved: Dict[str, List[str]] = {}
union: List[str] = []
@@ -198,10 +516,112 @@ class SkillsManager:
if skill_name not in union:
union.append(skill_name)
self.sync_active_skills(config_name=config_name, skill_names=union)
active_root = self.get_active_root(config_name)
# Maintain the legacy union directory for compatibility/debugging.
# Agent-local skills remain private to the agent workspace.
self.sync_active_skills(
target_root=self.get_active_root(config_name),
skill_names=[
skill_name
for skill_name in union
if self._is_shared_skill(skill_name)
],
)
return {
agent_id: [active_root / skill_name for skill_name in skill_names]
for agent_id, skill_names in resolved.items()
}
active_map: Dict[str, List[Path]] = {}
for agent_id, skill_names in resolved.items():
installed_sources = {
skill_name: self._resolve_source_dir(skill_name)
for skill_name in skill_names
if (self.get_agent_local_root(config_name, agent_id) / skill_name).exists() is False
}
installed_paths = self.sync_skill_dirs(
target_root=self.get_agent_installed_root(config_name, agent_id),
skill_sources=installed_sources,
)
local_root = self.get_agent_local_root(config_name, agent_id)
local_sources = {
skill_name: local_root / skill_name
for skill_name in skill_names
if (local_root / skill_name).exists()
}
active_sources = {
path.name: path for path in installed_paths
}
active_sources.update(local_sources)
active_map[agent_id] = self.sync_skill_dirs(
target_root=self.get_agent_active_root(config_name, agent_id),
skill_sources=active_sources,
)
disabled_names = _dedupe_preserve_order(
self._resolve_disabled_skill_names(
config_name=config_name,
agent_id=agent_id,
default_skills=agent_defaults.get(agent_id, []),
),
)
disabled_sources = {
skill_name: self._resolve_agent_skill_source_dir(
config_name=config_name,
agent_id=agent_id,
skill_name=skill_name,
)
for skill_name in disabled_names
}
self.sync_skill_dirs(
target_root=self.get_agent_disabled_root(config_name, agent_id),
skill_sources=disabled_sources,
)
return active_map
def _is_shared_skill(self, skill_name: str) -> bool:
try:
self._resolve_source_dir(skill_name)
except FileNotFoundError:
return False
return True
def _resolve_disabled_skill_names(
self,
config_name: str,
agent_id: str,
default_skills: Iterable[str],
) -> List[str]:
"""Resolve explicit disabled skills for one agent."""
bootstrap = get_bootstrap_config_for_run(self.project_root, config_name)
override = bootstrap.agent_override(agent_id)
baseline = list(override.get("skills", list(default_skills)))
agent_config = load_agent_workspace_config(
self.get_agent_asset_dir(config_name, agent_id) / "agent.yaml",
)
manifest = self.load_activation_manifest(config_name)
disabled = list(manifest.get("global_disabled_skills", []))
disabled.extend(manifest.get("agent_disabled_skills", {}).get(agent_id, []))
disabled.extend(agent_config.disabled_skills)
for skill_name in baseline:
if skill_name in agent_config.disabled_skills and skill_name not in disabled:
disabled.append(skill_name)
for item in self.list_agent_local_skills(config_name, agent_id):
if item.skill_name in agent_config.disabled_skills and item.skill_name not in disabled:
disabled.append(item.skill_name)
return [
skill
for skill in disabled
if self._skill_exists_for_agent(config_name, agent_id, skill)
]
def _dedupe_preserve_order(items: Iterable[str]) -> List[str]:
result: List[str] = []
for item in items:
if item not in result:
result.append(item)
return result
def _normalize_skill_name(raw_name: str) -> str:
normalized = str(raw_name or "").strip().lower().replace(" ", "_").replace("-", "_")
allowed = [ch for ch in normalized if ch.isalnum() or ch == "_"]
return "".join(allowed).strip("_")

View File

@@ -3,6 +3,7 @@
from typing import Any, Dict, Iterable
from .agent_workspace import load_agent_workspace_config
from backend.config.bootstrap_config import get_bootstrap_config_for_run
import yaml
@@ -151,6 +152,9 @@ def create_agent_toolkit(
profiles = load_agent_profiles()
profile = profiles.get(agent_id, {})
skills_manager = SkillsManager()
agent_config = load_agent_workspace_config(
skills_manager.get_agent_asset_dir(config_name, agent_id) / "agent.yaml",
)
bootstrap_config = get_bootstrap_config_for_run(
skills_manager.project_root,
config_name,
@@ -158,8 +162,16 @@ def create_agent_toolkit(
override = bootstrap_config.agent_override(agent_id)
active_groups = override.get(
"active_tool_groups",
profile.get("active_tool_groups", []),
agent_config.active_tool_groups
or profile.get("active_tool_groups", []),
)
disabled_groups = set(agent_config.disabled_tool_groups)
if disabled_groups:
active_groups = [
group_name
for group_name in active_groups
if group_name not in disabled_groups
]
toolkit = Toolkit(
agent_skill_instruction=(
@@ -184,7 +196,7 @@ def create_agent_toolkit(
default_skills=profile.get("skills", []),
)
active_skill_dirs = [
skills_manager.get_active_root(config_name) / skill_name
skills_manager.get_agent_active_root(config_name, agent_id) / skill_name
for skill_name in skill_names
]

View File

@@ -4,6 +4,8 @@
from pathlib import Path
from typing import Dict, Iterable, Optional
import yaml
from .skills_manager import SkillsManager
@@ -59,6 +61,10 @@ class WorkspaceManager:
agent_id,
)
asset_dir.mkdir(parents=True, exist_ok=True)
(asset_dir / "skills" / "installed").mkdir(parents=True, exist_ok=True)
(asset_dir / "skills" / "active").mkdir(parents=True, exist_ok=True)
(asset_dir / "skills" / "disabled").mkdir(parents=True, exist_ok=True)
(asset_dir / "skills" / "local").mkdir(parents=True, exist_ok=True)
self._ensure_file(
asset_dir / "ROLE.md",
@@ -81,6 +87,35 @@ class WorkspaceManager:
f"{policy_seed}".strip()
+ "\n",
)
self._ensure_file(
asset_dir / "SOUL.md",
"# Soul\n\n"
"Describe the agent's temperament, reasoning posture, and voice.\n\n",
)
self._ensure_file(
asset_dir / "PROFILE.md",
"# Profile\n\n"
"Track this agent's long-lived investment style, preferences, and strengths.\n\n",
)
self._ensure_file(
asset_dir / "AGENTS.md",
"# Agent Guide\n\n"
"Document how this agent should work, collaborate, and choose tools or skills.\n\n",
)
self._ensure_file(
asset_dir / "MEMORY.md",
"# Memory\n\n"
"Store durable lessons, heuristics, and reminders for this agent.\n\n",
)
self._ensure_file(
asset_dir / "HEARTBEAT.md",
"# Heartbeat\n\n"
"Optional checklist for periodic review or self-reflection.\n\n",
)
self._ensure_agent_yaml(
asset_dir / "agent.yaml",
agent_id=agent_id,
)
return asset_dir
def initialize_default_assets(
@@ -138,3 +173,27 @@ class WorkspaceManager:
def _ensure_file(path: Path, content: str) -> None:
if not path.exists():
path.write_text(content, encoding="utf-8")
@staticmethod
def _ensure_agent_yaml(path: Path, agent_id: str) -> None:
if path.exists():
return
payload = {
"agent_id": agent_id,
"prompt_files": [
"SOUL.md",
"PROFILE.md",
"AGENTS.md",
"POLICY.md",
"MEMORY.md",
],
"enabled_skills": [],
"disabled_skills": [],
"active_tool_groups": [],
"disabled_tool_groups": [],
}
path.write_text(
yaml.safe_dump(payload, allow_unicode=True, sort_keys=False),
encoding="utf-8",
)