# -*- coding: utf-8 -*- """Hook system for EvoAgent. Provides pre_reasoning and post_acting hooks with built-in implementations: - BootstrapHook: First-time setup guidance - MemoryCompactionHook: Automatic memory compression Based on CoPaw's hooks design. """ from __future__ import annotations import logging from abc import ABC, abstractmethod from pathlib import Path from typing import Any, Callable, Dict, List, Optional, TYPE_CHECKING if TYPE_CHECKING: from agentscope.agent import ReActAgent logger = logging.getLogger(__name__) # Hook types HookType = str HOOK_PRE_REASONING: HookType = "pre_reasoning" HOOK_POST_ACTING: HookType = "post_acting" class Hook(ABC): """Abstract base class for agent hooks.""" @abstractmethod async def __call__( self, agent: "ReActAgent", kwargs: Dict[str, Any], ) -> Optional[Dict[str, Any]]: """Execute the hook. Args: agent: The agent instance kwargs: Input arguments to the method being hooked Returns: Modified kwargs or None to use original """ pass class HookManager: """Manages agent hooks. Provides registration and execution of hooks for different lifecycle events in the agent's operation. """ def __init__(self): self._hooks: Dict[HookType, List[tuple[str, Hook]]] = { HOOK_PRE_REASONING: [], HOOK_POST_ACTING: [], } def register( self, hook_type: HookType, hook_name: str, hook: Hook | Callable, ) -> None: """Register a hook. Args: hook_type: Type of hook (pre_reasoning, post_acting) hook_name: Unique name for this hook hook: Hook instance or callable """ # Remove existing hook with same name self._hooks[hook_type] = [ (name, h) for name, h in self._hooks[hook_type] if name != hook_name ] self._hooks[hook_type].append((hook_name, hook)) logger.debug("Registered hook '%s' for type '%s'", hook_name, hook_type) def unregister(self, hook_type: HookType, hook_name: str) -> bool: """Unregister a hook. Args: hook_type: Type of hook hook_name: Name of the hook to remove Returns: True if hook was found and removed """ original_len = len(self._hooks[hook_type]) self._hooks[hook_type] = [ (name, h) for name, h in self._hooks[hook_type] if name != hook_name ] removed = len(self._hooks[hook_type]) < original_len if removed: logger.debug("Unregistered hook '%s' from type '%s'", hook_name, hook_type) return removed async def execute( self, hook_type: HookType, agent: "ReActAgent", kwargs: Dict[str, Any], ) -> Dict[str, Any]: """Execute all hooks of a given type. Args: hook_type: Type of hooks to execute agent: The agent instance kwargs: Input arguments Returns: Potentially modified kwargs """ for name, hook in self._hooks[hook_type]: try: result = await hook(agent, kwargs) if result is not None: kwargs = result except Exception as e: logger.error("Hook '%s' failed: %s", name, e, exc_info=True) return kwargs def list_hooks(self, hook_type: Optional[HookType] = None) -> List[str]: """List registered hook names. Args: hook_type: Optional type to filter by Returns: List of hook names """ if hook_type: return [name for name, _ in self._hooks.get(hook_type, [])] names = [] for hooks in self._hooks.values(): names.extend([name for name, _ in hooks]) return names class BootstrapHook(Hook): """Hook for bootstrap guidance on first user interaction. This hook looks for a BOOTSTRAP.md file in the working directory and if found, prepends guidance to the first user message to help establish the agent's identity and user preferences. """ def __init__( self, workspace_dir: Path, language: str = "zh", ): """Initialize bootstrap hook. Args: workspace_dir: Working directory containing BOOTSTRAP.md language: Language code for bootstrap guidance (en/zh) """ self.workspace_dir = Path(workspace_dir) self.language = language self._completed_flag = self.workspace_dir / ".bootstrap_completed" def _is_first_user_interaction(self, agent: "ReActAgent") -> bool: """Check if this is the first user interaction. Args: agent: The agent instance Returns: True if first user interaction """ if not hasattr(agent, "memory") or not agent.memory.content: return True # Count user messages (excluding system) user_count = sum( 1 for msg, _ in agent.memory.content if msg.role == "user" ) return user_count <= 1 def _build_bootstrap_guidance(self) -> str: """Build bootstrap guidance message. Returns: Formatted bootstrap guidance """ if self.language == "zh": return ( "# 引导模式\n" "\n" "工作目录中存在 `BOOTSTRAP.md` — 首次设置。\n" "\n" "1. 阅读 BOOTSTRAP.md,友好地表示初次见面," "引导用户完成设置。\n" "2. 按照 BOOTSTRAP.md 的指示," "帮助用户定义你的身份和偏好。\n" "3. 按指南创建/更新必要文件" "(PROFILE.md、MEMORY.md 等)。\n" "4. 完成后删除 BOOTSTRAP.md。\n" "\n" "如果用户希望跳过,直接回答下面的问题即可。\n" "\n" "---\n" "\n" ) return ( "# BOOTSTRAP MODE\n" "\n" "`BOOTSTRAP.md` exists — first-time setup.\n" "\n" "1. Read BOOTSTRAP.md, greet the user, " "and guide them through setup.\n" "2. Follow BOOTSTRAP.md instructions " "to define identity and preferences.\n" "3. Create/update files " "(PROFILE.md, MEMORY.md, etc.) as described.\n" "4. Delete BOOTSTRAP.md when done.\n" "\n" "If the user wants to skip, answer their " "question directly instead.\n" "\n" "---\n" "\n" ) async def __call__( self, agent: "ReActAgent", kwargs: Dict[str, Any], ) -> Optional[Dict[str, Any]]: """Check and load BOOTSTRAP.md on first user interaction. Args: agent: The agent instance kwargs: Input arguments to the _reasoning method Returns: None (hook doesn't modify kwargs) """ try: bootstrap_path = self.workspace_dir / "BOOTSTRAP.md" # Check if bootstrap has already been triggered if self._completed_flag.exists(): return None if not bootstrap_path.exists(): return None if not self._is_first_user_interaction(agent): return None bootstrap_guidance = self._build_bootstrap_guidance() logger.debug("Found BOOTSTRAP.md [%s], prepending guidance", self.language) # Prepend to first user message in memory if hasattr(agent, "memory") and agent.memory.content: system_count = sum( 1 for msg, _ in agent.memory.content if msg.role == "system" ) for msg, _ in agent.memory.content[system_count:]: if msg.role == "user": # Prepend guidance to message content original_content = msg.content msg.content = bootstrap_guidance + original_content break logger.debug("Bootstrap guidance prepended to first user message") # Create completion flag to prevent repeated triggering self._completed_flag.touch() logger.debug("Created bootstrap completion flag") except Exception as e: logger.error("Failed to process bootstrap: %s", e, exc_info=True) return None class MemoryCompactionHook(Hook): """Hook for automatic memory compaction when context is full. This hook monitors the token count of messages and triggers compaction when it exceeds the threshold. It preserves the system prompt and recent messages while summarizing older conversation history. """ def __init__( self, memory_manager: Any, memory_compact_threshold: Optional[int] = None, memory_compact_reserve: Optional[int] = None, enable_tool_result_compact: bool = False, tool_result_compact_keep_n: int = 5, ): """Initialize memory compaction hook. Args: memory_manager: Memory manager instance for compaction memory_compact_threshold: Token threshold for compaction memory_compact_reserve: Reserve tokens for recent messages enable_tool_result_compact: Enable tool result compaction tool_result_compact_keep_n: Number of tool results to keep """ self.memory_manager = memory_manager self.memory_compact_threshold = memory_compact_threshold self.memory_compact_reserve = memory_compact_reserve self.enable_tool_result_compact = enable_tool_result_compact self.tool_result_compact_keep_n = tool_result_compact_keep_n async def __call__( self, agent: "ReActAgent", kwargs: Dict[str, Any], ) -> Optional[Dict[str, Any]]: """Pre-reasoning hook to check and compact memory if needed. Args: agent: The agent instance kwargs: Input arguments to the _reasoning method Returns: None (hook doesn't modify kwargs) """ try: if not hasattr(agent, "memory") or not self.memory_manager: return None memory = agent.memory # Get current token count estimate messages = await memory.get_memory() total_tokens = self._estimate_tokens(messages) if self.memory_compact_threshold is None: return None if total_tokens < self.memory_compact_threshold: return None logger.info( "Memory compaction triggered: %d tokens (threshold: %d)", total_tokens, self.memory_compact_threshold, ) # Compact memory await self._compact_memory(agent, messages) except Exception as e: logger.error("Failed to compact memory: %s", e, exc_info=True) return None def _estimate_tokens(self, messages: List[Any]) -> int: """Estimate token count for messages. Args: messages: List of messages Returns: Estimated token count """ # Simple estimation: ~4 chars per token total_chars = sum( len(str(getattr(msg, "content", ""))) for msg in messages ) return total_chars // 4 async def _compact_memory( self, agent: "ReActAgent", messages: List[Any], ) -> None: """Compact memory by summarizing older messages. Args: agent: The agent instance messages: Current messages in memory """ if self.memory_compact_reserve is None: return # Keep recent messages keep_count = min( len(messages) // 4, 10, # Max 10 recent messages ) keep_count = max(keep_count, 2) # At least 2 messages_to_compact = messages[:-keep_count] if keep_count < len(messages) else [] if not messages_to_compact: return # Use memory manager to compact if available if hasattr(self.memory_manager, "compact_memory"): try: summary = await self.memory_manager.compact_memory( messages=messages_to_compact, ) logger.info("Memory compacted: %d messages summarized", len(messages_to_compact)) # Mark messages as compressed if supported if hasattr(agent.memory, "update_messages_mark"): from agentscope.agent._react_agent import _MemoryMark await agent.memory.update_messages_mark( new_mark=_MemoryMark.COMPRESSED, msg_ids=[msg.id for msg in messages_to_compact], ) except Exception as e: logger.error("Memory manager compaction failed: %s", e) __all__ = [ "Hook", "HookManager", "HookType", "HOOK_PRE_REASONING", "HOOK_POST_ACTING", "BootstrapHook", "MemoryCompactionHook", ]