# -*- coding: utf-8 -*- """Workspace Manager - Create and manage agent workspaces.""" import logging from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional import yaml logger = logging.getLogger(__name__) @dataclass class WorkspaceConfig: """Configuration for a workspace.""" workspace_id: str name: str = "" description: str = "" created_at: str = "" metadata: Dict[str, Any] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: """Serialize to dictionary.""" return { "workspace_id": self.workspace_id, "name": self.name, "description": self.description, "created_at": self.created_at, "metadata": self.metadata, } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "WorkspaceConfig": """Create from dictionary.""" return cls( workspace_id=data.get("workspace_id", ""), name=data.get("name", ""), description=data.get("description", ""), created_at=data.get("created_at", ""), metadata=data.get("metadata", {}), ) class WorkspaceRegistry: """Registry for persistent workspace definitions (design-time).""" def __init__(self, project_root: Optional[Path] = None): """Initialize the workspace manager. Args: project_root: Root directory of the project """ self.project_root = project_root or Path(__file__).parent.parent.parent self.workspaces_root = self.project_root / "workspaces" self.workspaces_root.mkdir(parents=True, exist_ok=True) def create_workspace( self, workspace_id: str, name: Optional[str] = None, description: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, ) -> WorkspaceConfig: """Create a new workspace with directory structure. Args: workspace_id: Unique identifier for the workspace name: Display name for the workspace description: Optional description metadata: Optional metadata dictionary Returns: WorkspaceConfig instance Raises: ValueError: If workspace already exists """ workspace_dir = self.workspaces_root / workspace_id if workspace_dir.exists(): raise ValueError(f"Workspace '{workspace_id}' already exists") # Create directory structure workspace_dir.mkdir(parents=True, exist_ok=True) # Create subdirectories (workspace_dir / "agents").mkdir(exist_ok=True) (workspace_dir / "shared" / "market_data").mkdir(parents=True, exist_ok=True) (workspace_dir / "shared" / "memories").mkdir(parents=True, exist_ok=True) # Create workspace.yaml from datetime import datetime config = WorkspaceConfig( workspace_id=workspace_id, name=name or workspace_id, description=description or "", created_at=datetime.now().isoformat(), metadata=metadata or {}, ) self._write_workspace_config(workspace_dir, config) return config def list_workspaces(self) -> List[WorkspaceConfig]: """List all workspaces. Returns: List of WorkspaceConfig instances """ workspaces = [] if not self.workspaces_root.exists(): return workspaces for workspace_dir in self.workspaces_root.iterdir(): if not workspace_dir.is_dir(): continue config_path = workspace_dir / "workspace.yaml" if config_path.exists(): try: with open(config_path, "r", encoding="utf-8") as f: data = yaml.safe_load(f) or {} workspaces.append(WorkspaceConfig.from_dict(data)) except Exception as e: logger.warning(f"Failed to load workspace config {config_path}: {e}") return workspaces def get_workspace_agents(self, workspace_id: str) -> List[Dict[str, Any]]: """Get all agents in a workspace. Args: workspace_id: ID of the workspace Returns: List of agent information dictionaries Raises: ValueError: If workspace doesn't exist """ workspace_dir = self.workspaces_root / workspace_id if not workspace_dir.exists(): raise ValueError(f"Workspace '{workspace_id}' does not exist") agents = [] agents_dir = workspace_dir / "agents" if not agents_dir.exists(): return agents for agent_dir in agents_dir.iterdir(): if not agent_dir.is_dir(): continue config_path = agent_dir / "agent.yaml" if config_path.exists(): try: with open(config_path, "r", encoding="utf-8") as f: config = yaml.safe_load(f) or {} agents.append({ "agent_id": agent_dir.name, "agent_type": config.get("agent_type", "unknown"), "config_path": str(config_path), }) except Exception as e: logger.warning(f"Failed to load agent config {config_path}: {e}") return agents def get_agent_workspace(self, agent_id: str, workspace_id: str) -> Optional[Path]: """Get the workspace path for an agent. Args: agent_id: ID of the agent workspace_id: ID of the workspace Returns: Path to agent directory, or None if not found """ agent_dir = self.workspaces_root / workspace_id / "agents" / agent_id if agent_dir.exists(): return agent_dir return None def workspace_exists(self, workspace_id: str) -> bool: """Check if a workspace exists. Args: workspace_id: ID of the workspace Returns: True if workspace exists, False otherwise """ workspace_dir = self.workspaces_root / workspace_id return workspace_dir.exists() and (workspace_dir / "workspace.yaml").exists() def delete_workspace(self, workspace_id: str, force: bool = False) -> bool: """Delete a workspace and all its agents. Args: workspace_id: ID of the workspace to delete force: If True, delete even if workspace has agents Returns: True if deleted, False if workspace didn't exist Raises: ValueError: If workspace has agents and force is False """ import shutil workspace_dir = self.workspaces_root / workspace_id if not workspace_dir.exists(): return False # Check for agents agents_dir = workspace_dir / "agents" if agents_dir.exists() and any(agents_dir.iterdir()): if not force: raise ValueError( f"Workspace '{workspace_id}' contains agents. " "Use force=True to delete anyway." ) shutil.rmtree(workspace_dir) return True def get_workspace_path(self, workspace_id: str) -> Path: """Get the path to a workspace directory. Args: workspace_id: ID of the workspace Returns: Path to workspace directory """ return self.workspaces_root / workspace_id def get_shared_data_path(self, workspace_id: str) -> Optional[Path]: """Get the shared data directory for a workspace. Args: workspace_id: ID of the workspace Returns: Path to shared data directory, or None if workspace doesn't exist """ workspace_dir = self.workspaces_root / workspace_id if not workspace_dir.exists(): return None return workspace_dir / "shared" def update_workspace_config( self, workspace_id: str, name: Optional[str] = None, description: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, ) -> WorkspaceConfig: """Update workspace configuration. Args: workspace_id: ID of the workspace name: New display name (optional) description: New description (optional) metadata: Metadata to merge (optional) Returns: Updated WorkspaceConfig Raises: ValueError: If workspace doesn't exist """ workspace_dir = self.workspaces_root / workspace_id if not workspace_dir.exists(): raise ValueError(f"Workspace '{workspace_id}' does not exist") config_path = workspace_dir / "workspace.yaml" current_config = {} if config_path.exists(): try: with open(config_path, "r", encoding="utf-8") as f: current_config = yaml.safe_load(f) or {} except Exception as e: logger.warning(f"Failed to load existing config {config_path}: {e}") # Update fields if name is not None: current_config["name"] = name if description is not None: current_config["description"] = description if metadata is not None: current_config["metadata"] = {**current_config.get("metadata", {}), **metadata} config = WorkspaceConfig.from_dict(current_config) self._write_workspace_config(workspace_dir, config) return config def _write_workspace_config(self, workspace_dir: Path, config: WorkspaceConfig) -> None: """Write workspace configuration to file. Args: workspace_dir: Workspace directory config: Workspace configuration """ config_path = workspace_dir / "workspace.yaml" with open(config_path, "w", encoding="utf-8") as f: yaml.safe_dump(config.to_dict(), f, allow_unicode=True, sort_keys=False) # Backward-compatible alias: legacy imports expect WorkspaceManager. WorkspaceManager = WorkspaceRegistry