# -*- coding: utf-8 -*- """Agent Factory - Dynamic creation and management of EvoAgents.""" import logging import shutil from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional import yaml logger = logging.getLogger(__name__) @dataclass class ModelConfig: """Model configuration for an agent.""" model_name: str = "gpt-4o" temperature: float = 0.7 max_tokens: int = 4096 @dataclass class RoleConfig: """Role configuration for an agent.""" name: str description: str = "" focus_areas: List[str] = None constraints: List[str] = None def __post_init__(self): if self.focus_areas is None: self.focus_areas = [] if self.constraints is None: self.constraints = [] class EvoAgent: """Represents a configured agent instance.""" def __init__( self, agent_id: str, agent_type: str, workspace_id: str, config_path: Path, model_config: Optional[ModelConfig] = None, role_config: Optional[RoleConfig] = None, ): self.agent_id = agent_id self.agent_type = agent_type self.workspace_id = workspace_id self.config_path = config_path self.model_config = model_config or ModelConfig() self.role_config = role_config self.agent_dir = config_path.parent def to_dict(self) -> Dict[str, Any]: """Serialize agent to dictionary.""" return { "agent_id": self.agent_id, "agent_type": self.agent_type, "workspace_id": self.workspace_id, "config_path": str(self.config_path), "agent_dir": str(self.agent_dir), "model_config": { "model_name": self.model_config.model_name, "temperature": self.model_config.temperature, "max_tokens": self.model_config.max_tokens, }, "role_config": self.role_config.__dict__ if self.role_config else None, } class AgentFactory: """Factory for creating, cloning, and managing agents.""" # Default role templates by agent type ROLE_TEMPLATES = { "technical_analyst": { "name": "Technical Analyst", "description": "Analyze price patterns, trends, and technical indicators.", "focus_areas": [ "Price action and chart patterns", "Support and resistance levels", "Technical indicators (RSI, MACD, Moving Averages)", "Volume analysis", ], "constraints": [ "State clear signal, confidence, and invalidation conditions", "Use available technical analysis tools", ], }, "fundamentals_analyst": { "name": "Fundamentals Analyst", "description": "Analyze company financials, earnings, and business metrics.", "focus_areas": [ "Financial statements analysis", "Earnings reports and guidance", "Valuation metrics", "Business model and competitive position", ], "constraints": [ "State clear signal, confidence, and invalidation conditions", "Use available fundamental analysis tools", ], }, "sentiment_analyst": { "name": "Sentiment Analyst", "description": "Analyze market sentiment, news, and social signals.", "focus_areas": [ "News sentiment analysis", "Social media sentiment", "Analyst ratings and price targets", "Insider activity", ], "constraints": [ "State clear signal, confidence, and invalidation conditions", "Use available sentiment analysis tools", ], }, "valuation_analyst": { "name": "Valuation Analyst", "description": "Perform valuation analysis and price target calculations.", "focus_areas": [ "DCF and comparable valuation", "Price target derivation", "Margin of safety assessment", "Risk-adjusted return expectations", ], "constraints": [ "State clear signal, confidence, and invalidation conditions", "Use available valuation tools", ], }, "risk_manager": { "name": "Risk Manager", "description": "Quantify concentration, leverage, liquidity, and volatility risk.", "focus_areas": [ "Portfolio concentration risk", "Leverage and margin analysis", "Liquidity assessment", "Volatility and drawdown risk", ], "constraints": [ "Prioritize highest-severity risk first", "State concrete limits and recommendations", "Use available risk tools before issuing final memo", ], }, "portfolio_manager": { "name": "Portfolio Manager", "description": "Synthesize analyst and risk inputs into portfolio decisions.", "focus_areas": [ "Position sizing and allocation", "Risk-adjusted portfolio construction", "Trade execution timing", "Portfolio rebalancing", ], "constraints": [ "Be concise, capital-aware, and explicit about sizing rationale", "Respect cash, margin, and concentration constraints", "Consider all analyst inputs before decisions", ], }, } def __init__(self, project_root: Optional[Path] = None): """Initialize the agent factory. Args: project_root: Root directory of the project """ self.project_root = project_root or Path(__file__).parent.parent.parent self.workspaces_root = self.project_root / "workspaces" self.template_dir = self.project_root / "backend" / "workspaces" / ".template" def create_agent( self, agent_id: str, agent_type: str, workspace_id: str, model_config: Optional[ModelConfig] = None, role_config: Optional[RoleConfig] = None, clone_from: Optional[str] = None, ) -> EvoAgent: """Create a new agent. Args: agent_id: Unique identifier for the agent agent_type: Type of agent (e.g., "technical_analyst") workspace_id: ID of the workspace to create agent in model_config: Model configuration role_config: Role configuration (auto-generated if None) clone_from: Path to existing agent to clone from (optional) Returns: EvoAgent instance Raises: ValueError: If agent already exists or workspace doesn't exist """ workspace_dir = self.workspaces_root / workspace_id if not workspace_dir.exists(): raise ValueError(f"Workspace '{workspace_id}' does not exist") agent_dir = workspace_dir / "agents" / agent_id if agent_dir.exists(): raise ValueError(f"Agent '{agent_id}' already exists in workspace '{workspace_id}'") # Create directory structure agent_dir.mkdir(parents=True, exist_ok=True) (agent_dir / "skills" / "active").mkdir(parents=True, exist_ok=True) (agent_dir / "skills" / "local").mkdir(parents=True, exist_ok=True) (agent_dir / "skills" / "installed").mkdir(parents=True, exist_ok=True) (agent_dir / "skills" / "disabled").mkdir(parents=True, exist_ok=True) # Copy template or clone existing agent if clone_from: self._clone_agent_files(clone_from, agent_dir, agent_id) else: self._copy_template(agent_dir, agent_id, agent_type) # Generate role config if not provided if role_config is None: role_config = self._generate_role_config(agent_type) # Generate ROLE.md self._generate_role_md(agent_dir, role_config) # Write agent.yaml config_path = agent_dir / "agent.yaml" self._write_agent_yaml(config_path, agent_id, agent_type, model_config) return EvoAgent( agent_id=agent_id, agent_type=agent_type, workspace_id=workspace_id, config_path=config_path, model_config=model_config, role_config=role_config, ) def delete_agent(self, agent_id: str, workspace_id: str) -> bool: """Delete an agent and its workspace. Args: agent_id: ID of the agent to delete workspace_id: ID of the workspace containing the agent Returns: True if deleted, False if agent didn't exist """ agent_dir = self.workspaces_root / workspace_id / "agents" / agent_id if not agent_dir.exists(): return False shutil.rmtree(agent_dir) return True def clone_agent( self, source_agent_id: str, source_workspace_id: str, new_agent_id: str, target_workspace_id: Optional[str] = None, model_config: Optional[ModelConfig] = None, ) -> EvoAgent: """Clone an existing agent. Args: source_agent_id: ID of the agent to clone source_workspace_id: Workspace containing the source agent new_agent_id: ID for the new agent target_workspace_id: Target workspace (defaults to source workspace) model_config: Optional new model configuration Returns: EvoAgent instance for the cloned agent """ target_workspace_id = target_workspace_id or source_workspace_id source_dir = self.workspaces_root / source_workspace_id / "agents" / source_agent_id if not source_dir.exists(): raise ValueError(f"Source agent '{source_agent_id}' not found") # Load source agent config source_config_path = source_dir / "agent.yaml" source_config = {} if source_config_path.exists(): with open(source_config_path, "r", encoding="utf-8") as f: source_config = yaml.safe_load(f) or {} agent_type = source_config.get("agent_type", "generic") # Determine source path for cloning clone_from = str(source_dir) return self.create_agent( agent_id=new_agent_id, agent_type=agent_type, workspace_id=target_workspace_id, model_config=model_config, clone_from=clone_from, ) def list_agents(self, workspace_id: Optional[str] = None) -> List[Dict[str, Any]]: """List all agents. Args: workspace_id: Optional workspace to filter by Returns: List of agent information dictionaries """ agents = [] if workspace_id: workspaces = [self.workspaces_root / workspace_id] else: if not self.workspaces_root.exists(): return agents workspaces = [d for d in self.workspaces_root.iterdir() if d.is_dir()] for workspace in workspaces: agents_dir = workspace / "agents" if not agents_dir.exists(): continue for agent_dir in agents_dir.iterdir(): if not agent_dir.is_dir(): continue config_path = agent_dir / "agent.yaml" if config_path.exists(): try: with open(config_path, "r", encoding="utf-8") as f: config = yaml.safe_load(f) or {} agents.append({ "agent_id": agent_dir.name, "workspace_id": workspace.name, "agent_type": config.get("agent_type", "unknown"), "config_path": str(config_path), }) except Exception as e: logger.warning(f"Failed to load agent config {config_path}: {e}") return agents def _copy_template( self, agent_dir: Path, agent_id: str, agent_type: str, ) -> None: """Copy template files to agent directory. Args: agent_dir: Target agent directory agent_id: ID of the agent agent_type: Type of the agent """ # Create default markdown files default_files = { "AGENTS.md": f"# Agent Guide\n\nDocument how {agent_id} should work, collaborate, and choose tools or skills.\n\n", "SOUL.md": f"# Soul\n\nDescribe {agent_id}'s temperament, reasoning posture, and voice.\n\n", "PROFILE.md": f"# Profile\n\nTrack {agent_id}'s long-lived investment style, preferences, and strengths.\n\n", "MEMORY.md": f"# Memory\n\nStore durable lessons, heuristics, and reminders for {agent_id}.\n\n", "HEARTBEAT.md": f"# Heartbeat\n\nOptional checklist for periodic review or self-reflection.\n\n", "POLICY.md": f"# Policy\n\nOptional run-scoped constraints, limits, or strategy policy.\n\n", "STYLE.md": f"# Style\n\nOptional run-scoped communication or reasoning style.\n\n", } for filename, content in default_files.items(): filepath = agent_dir / filename if not filepath.exists(): filepath.write_text(content, encoding="utf-8") def _clone_agent_files(self, source_path: str, target_dir: Path, new_agent_id: str) -> None: """Clone files from an existing agent. Args: source_path: Path to source agent directory target_dir: Target agent directory new_agent_id: ID for the new agent """ source_dir = Path(source_path) if not source_dir.exists(): raise ValueError(f"Source path '{source_path}' does not exist") # Copy markdown files for md_file in source_dir.glob("*.md"): target_file = target_dir / md_file.name content = md_file.read_text(encoding="utf-8") # Update agent references in content source_name = source_dir.name content = content.replace(source_name, new_agent_id) target_file.write_text(content, encoding="utf-8") # Copy skills directory structure (but not contents) for skill_subdir in ["active", "local", "installed", "disabled"]: source_skills = source_dir / "skills" / skill_subdir if source_skills.exists(): target_skills = target_dir / "skills" / skill_subdir target_skills.mkdir(parents=True, exist_ok=True) # Copy skill files for skill_file in source_skills.iterdir(): if skill_file.is_file(): shutil.copy2(skill_file, target_skills / skill_file.name) def _generate_role_config(self, agent_type: str) -> RoleConfig: """Generate role configuration for an agent type. Args: agent_type: Type of agent Returns: RoleConfig instance """ template = self.ROLE_TEMPLATES.get(agent_type, {}) return RoleConfig( name=template.get("name", agent_type.replace("_", " ").title()), description=template.get("description", ""), focus_areas=template.get("focus_areas", []), constraints=template.get("constraints", []), ) def _generate_role_md(self, agent_dir: Path, role_config: RoleConfig) -> None: """Generate ROLE.md file. Args: agent_dir: Agent directory role_config: Role configuration """ lines = [f"# {role_config.name}", ""] if role_config.description: lines.extend([role_config.description, ""]) if role_config.focus_areas: lines.extend(["## Focus Areas", ""]) for area in role_config.focus_areas: lines.append(f"- {area}") lines.append("") if role_config.constraints: lines.extend(["## Constraints", ""]) for constraint in role_config.constraints: lines.append(f"- {constraint}") lines.append("") content = "\n".join(lines) (agent_dir / "ROLE.md").write_text(content, encoding="utf-8") def _write_agent_yaml( self, config_path: Path, agent_id: str, agent_type: str, model_config: Optional[ModelConfig] = None, ) -> None: """Write agent.yaml configuration file. Args: config_path: Path to write configuration agent_id: Agent ID agent_type: Agent type model_config: Optional model configuration """ config = { "agent_id": agent_id, "agent_type": agent_type, "prompt_files": [ "SOUL.md", "PROFILE.md", "AGENTS.md", "POLICY.md", "MEMORY.md", ], "enabled_skills": [], "disabled_skills": [], "active_tool_groups": [], "disabled_tool_groups": [], } if model_config: config["model"] = { "name": model_config.model_name, "temperature": model_config.temperature, "max_tokens": model_config.max_tokens, } with open(config_path, "w", encoding="utf-8") as f: yaml.safe_dump(config, f, allow_unicode=True, sort_keys=False)