# -*- coding: utf-8 -*- """ Meta Planner agent class that can handle complicated tasks with planning-execution pattern. """ import json import os import uuid from datetime import datetime from pathlib import Path from typing import Any, Literal, Optional from _planning_tools import ( PlannerNoteBook, # pylint: disable=C0411 RoadmapManager, WorkerManager, share_tools, ) from agentscope.agent import ReActAgent from agentscope.formatter import FormatterBase from agentscope.memory import MemoryBase from agentscope.message import Msg, TextBlock, ToolResultBlock, ToolUseBlock from agentscope.model import ChatModelBase from agentscope.tool import Toolkit, ToolResponse PlannerStage = Literal["post_reasoning", "post_action", "pre_reasoning"] def _infer_planner_stage_with_msg( cur_msg: Msg, ) -> tuple[PlannerStage, list[str]]: """ Infer the planner stage and extract tool names from a message. Analyzes a message to determine the current stage of the planner workflow and extracts any tool names if tool calls are present in the message. Args: cur_msg (Msg): The message to analyze for stage inference. Returns: tuple[PlannerStage, list[str]]: A tuple containing: - PlannerStage: One of "pre_reasoning", "post_reasoning", or "post_action" - list[str]: List of tool names found in tool_use or tool_result blocks Note: - "pre_reasoning": System role messages with string content - "post_reasoning": Messages with tool_use blocks or plain text content - "post_action": Messages with tool_result blocks - Tool names are extracted from both tool_use and tool_result blocks """ blocks = cur_msg.content if isinstance(blocks, str) and cur_msg.role in ["system", "user"]: return "pre_reasoning", [] cur_tool_names = [ str(b.get("name", "no_name_tool")) for b in blocks if b["type"] in ["tool_use", "tool_result"] ] if cur_msg.has_content_blocks("tool_result"): return "post_action", cur_tool_names elif cur_msg.has_content_blocks("tool_use"): return "post_reasoning", cur_tool_names else: return "post_reasoning", cur_tool_names def update_user_input_pre_reply_hook( self: "MetaPlanner", kwargs: dict[str, Any], ) -> None: """Hook for loading user input to planner notebook""" msg = kwargs.get("msg", None) if isinstance(msg, Msg): msg = [msg] if isinstance(msg, list): for m in msg: self.planner_notebook.user_input.append(m.content) def planner_save_post_reasoning_state( self: "MetaPlanner", reasoning_input: dict[str, Any], # pylint: disable=W0613 reasoning_output: Msg, ) -> None: """Hook func for save state after reasoning step""" if self.state_saving_dir: os.makedirs(self.state_saving_dir, exist_ok=True) cur_stage, _ = _infer_planner_stage_with_msg(reasoning_output) time_str = datetime.now().strftime("%Y%m%d%H%M%S") file_path = os.path.join( self.state_saving_dir, f"state-{cur_stage}-{time_str}.json", ) with open(file_path, "w", encoding="utf-8") as f: json.dump(self.state_dict(), f, ensure_ascii=False, indent=4) async def planner_load_state_pre_reasoning_hook( self: "MetaPlanner", # pylint: disable=W0613 *args: Any, **kwargs: Any, ) -> None: """Hook func for loading saved state after reasoning step""" mem_msgs = await self.memory.get_memory() if len(mem_msgs) > 0: stage, _ = _infer_planner_stage_with_msg(mem_msgs[-1]) if stage == "post_reasoning": self.state_loading_reasoning_msg = mem_msgs[-1] # delete the last reasoning message to avoid error when # calling model in reasoning step await self.memory.delete(len(mem_msgs) - 1) async def planner_load_state_post_reasoning_hook( self: "MetaPlanner", # pylint: disable=W0613 *args: Any, **kwargs: Any, ) -> Msg: """Hook func for loading saved state after reasoning step""" if self.state_loading_reasoning_msg is not None: num_msgs = await self.memory.size() # replace the newly generated reasoning message with the loaded one await self.memory.delete(num_msgs - 1) old_reasoning_msg = self.state_loading_reasoning_msg await self.memory.add(old_reasoning_msg) self.state_loading_reasoning_msg = None return old_reasoning_msg async def planner_compose_reasoning_msg_pre_reasoning_hook( self: "MetaPlanner", # pylint: disable=W0613 *args: Any, **kwargs: Any, ) -> None: """Hook func for composing msg for reasoning step""" reasoning_info = ( "## All User Input\n{all_user_input}\n\n" "## Session Context\n" "```json\n{notebook_string}\n```\n\n" ).format_map( { "notebook_string": self.planner_notebook.model_dump_json( exclude={"user_input", "full_tool_list"}, indent=2, ), "all_user_input": self.planner_notebook.user_input, }, ) reasoning_msg = Msg( "user", content=reasoning_info, role="user", ) await self.memory.add(reasoning_msg) async def planner_remove_reasoning_msg_post_reasoning_hook( self: "MetaPlanner", # pylint: disable=W0613 *args: Any, **kwargs: Any, ) -> None: """Hook func for removing msg for reasoning step""" num_msgs = await self.memory.size() if num_msgs > 1: # remove the msg added by planner_compose_reasoning_pre_reasoning_hook await self.memory.delete(num_msgs - 2) def planner_save_post_action_state( self: "MetaPlanner", action_input: dict[str, Any], tool_output: Optional[Msg], # pylint: disable=W0613 ) -> None: """Hook func for save state after action step""" if self.state_saving_dir: os.makedirs(self.state_saving_dir, exist_ok=True) time_str = datetime.now().strftime("%Y%m%d%H%M%S") file_path = os.path.join( self.state_saving_dir, "state-post-action-" f"{action_input.get('tool_call').get('name')}-{time_str}.json", ) with open(file_path, "w", encoding="utf-8") as f: json.dump(self.state_dict(), f, ensure_ascii=False, indent=4) class MetaPlanner(ReActAgent): """ A meta-planning agent that extends ReActAgent with enhanced planning capabilities. The MetaPlanner is designed to handle complex multistep planning tasks by leveraging a combination of reasoning and action capabilities. The subtasks will be solved by dynamically create ReAct worker agent and provide it with necessary tools. """ def __init__( self, name: str, model: ChatModelBase, worker_full_toolkit: Toolkit, formatter: FormatterBase, memory: MemoryBase, toolkit: Toolkit, agent_working_dir: str, sys_prompt: Optional[str] = None, max_iters: int = 10, state_saving_dir: Optional[str] = None, planner_mode: Literal["disable", "dynamic", "enforced"] = "dynamic", ) -> None: """ Initialize the MetaPlanner with the given parameters. Args: name (str): The name identifier for this agent instance. model (ChatModelBase): The primary chat model used for reasoning and response generation. worker_full_toolkit (Toolkit): Complete set of tools available to the worker agent. formatter (FormatterBase): Formatter for formatting messages to the model API provider's format. memory (MemoryBase): Memory system for storing conversation history and context. toolkit (Toolkit): Toolkit for managing tools available to the agent. agent_working_dir (str): Directory for agent's file operations. sys_prompt (str, optional): Meta planner's system prompt max_iters (int, optional): Maximum number of planning iterations. Defaults to 10. state_saving_dir (Optional[str], optional): Directory to save the agent's state. Defaults to None. planner_mode (bool, optional): Enable planner mode for solving tasks. Defaults to True. """ name = "Task-Meta-Planner" if name is None else name if sys_prompt is None: sys_prompt = ( "You are a helpful assistant named Task-Meta-Planner." "If a given task can not be done easily, then you may need " "to use the tool `enter_solving_complicated_task_mode` to " "change yourself to a more long-term planning mode." ) # Call super().__init__() early to initialize StateModule attributes super().__init__( name=name, sys_prompt=sys_prompt, model=model, formatter=formatter, memory=memory, toolkit=toolkit, max_iters=max_iters, ) self.agent_working_dir_root = agent_working_dir self.task_dir = self.agent_working_dir_root self.worker_full_toolkit = worker_full_toolkit self.state_saving_dir = state_saving_dir # if we load a trajectory and the last step was reasoning, # then we need a buffer to store the reasoning message and replace # with this message after reasoning self.state_loading_reasoning_msg: Optional[Msg] = None # for debugging and state resume, we need a flag to indicate self.planner_mode = planner_mode self.in_planner_mode = False self.register_state("planner_mode") self.register_state("in_planner_mode") self.planner_notebook = None self.roadmap_manager, self.worker_manager = None, None if planner_mode in ["dynamic", "enforced"]: self.planner_notebook = PlannerNoteBook() self.prepare_planner_tools(planner_mode) self.register_state( "planner_notebook", lambda x: x.model_dump(), lambda x: PlannerNoteBook(**x), ) # pre-reply hook self.register_instance_hook( "pre_reply", "update_user_input_to_notebook_pre_reply_hook", update_user_input_pre_reply_hook, ) # pre-reasoning hook self.register_instance_hook( "pre_reasoning", "planner_load_state_pre_reasoning_hook", planner_load_state_pre_reasoning_hook, ) self.register_instance_hook( "pre_reasoning", "planner_compose_reasoning_msg_pre_reasoning_hook", planner_compose_reasoning_msg_pre_reasoning_hook, ) # post_reasoning hook self.register_instance_hook( "post_reasoning", "planner_load_state_post_reasoning_hook", planner_load_state_post_reasoning_hook, ) self.register_instance_hook( "post_reasoning", "planner_remove_reasoning_msg_post_reasoning_hook", planner_remove_reasoning_msg_post_reasoning_hook, ) self.register_instance_hook( "post_reasoning", "save_state_post_reasoning_hook", planner_save_post_reasoning_state, ) # post_action_hook self.register_instance_hook( "post_acting", "save_state_post_action_hook", planner_save_post_action_state, ) def prepare_planner_tools( self, planner_mode: Literal["disable", "enforced", "dynamic"], ) -> None: """ Prepare tool to planning depending on the selected mode. """ self.roadmap_manager = RoadmapManager( planner_notebook=self.planner_notebook, ) self.worker_manager = WorkerManager( worker_model=self.model, worker_formatter=self.formatter, planner_notebook=self.planner_notebook, agent_working_dir=self.task_dir, worker_full_toolkit=self.worker_full_toolkit, ) # clean self.toolkit.remove_tool_groups("planning") self.toolkit.create_tool_group( "planning", "Tool group for planning capability", ) # re-register planning tool to enable loading the correct info self.toolkit.register_tool_function( self.roadmap_manager.decompose_task_and_build_roadmap, group_name="planning", ) self.toolkit.register_tool_function( self.roadmap_manager.revise_roadmap, group_name="planning", ) self.toolkit.register_tool_function( self.roadmap_manager.get_next_unfinished_subtask_from_roadmap, group_name="planning", ) self.toolkit.register_tool_function( self.worker_manager.show_current_worker_pool, group_name="planning", ) self.toolkit.register_tool_function( self.worker_manager.create_worker, group_name="planning", ) self.toolkit.register_tool_function( self.worker_manager.execute_worker, group_name="planning", ) if planner_mode == "dynamic": if "enter_solving_complicated_task_mode" not in self.toolkit.tools: self.toolkit.register_tool_function( self.enter_solving_complicated_task_mode, ) # Only activate after agent decides to enter the # planning-execution mode self.toolkit.update_tool_groups(["planning"], False) elif planner_mode == "enforced": self.toolkit.update_tool_groups(["planning"], True) # use the self.agent_working_dir as working dir self._update_toolkit_and_sys_prompt() def _ensure_file_system_functions(self) -> None: required_tool_list = [ "read_file", "write_file", "edit_file", "create_directory", "list_directory", "directory_tree", "list_allowed_directories", ] for tool_name in required_tool_list: if tool_name not in self.worker_full_toolkit.tools: raise ValueError( f"{tool_name} must be in the worker toolkit and " "its tool group must be active for complicated.", ) share_tools(self.worker_full_toolkit, self.toolkit, required_tool_list) async def enter_solving_complicated_task_mode( self, task_name: str, ) -> ToolResponse: """ When the user task meets any of the following conditions, enter the solving complicated task mode by using this tool. 1. the task cannot be done within 5 reasoning-acting iterations; 2. the task cannot be done by the current tools you can see; 3. the task is related to comprehensive research or information gathering Args: task_name (`str`): Given a name to the current task as an indicator. Because this name will be used to create a directory, so try to use "_" instead of space between words, e.g. "A_NEW_TASK". """ # build directory for the task self._ensure_file_system_functions() self.task_dir = os.path.join( self.agent_working_dir_root, task_name, ) self.worker_manager.agent_working_dir = self.task_dir create_task_dir = ToolUseBlock( type="tool_use", id=str(uuid.uuid4()), name="create_directory", input={ "path": self.task_dir, }, ) tool_res = await self.toolkit.call_tool_function(create_task_dir) tool_res_msg = Msg( "system", content=[ ToolResultBlock( type="tool_result", output=[], name="create_directory", id=create_task_dir["id"], ), ], role="system", ) async for chunk in tool_res: # Turn into a tool result block tool_res_msg.content[0]["output"] = chunk.content await self.print(tool_res_msg) self._update_toolkit_and_sys_prompt() return ToolResponse( metadata={"success": True}, content=[ TextBlock( type="text", text=( "Successfully enter the planning-execution mode to " "solve complicated task. " "All the file operations, including" "read/write/modification, should be done in directory " f"{self.task_dir}" ), ), ], ) def _update_toolkit_and_sys_prompt(self) -> None: # change agent settings for solving complicated task full_worker_tool_list = [ { "tool_name": func_dict.get("function", {}).get("name", ""), "description": func_dict.get("function", {}).get( "description", "", ), } for func_dict in self.worker_full_toolkit.get_json_schemas() ] self.planner_notebook.full_tool_list = full_worker_tool_list with open( Path(__file__).parent / "_built_in_long_sys_prompt" / "meta_planner_sys_prompt.md", "r", encoding="utf-8", ) as f: sys_prompt = f.read() sys_prompt = sys_prompt.format_map( { "tool_list": json.dumps( full_worker_tool_list, ensure_ascii=False, ), }, ) self._sys_prompt = sys_prompt # pylint: disable=W0201 self.toolkit.update_tool_groups(["planning"], True) self.in_planner_mode = True def resume_planner_tools(self) -> None: """Resume the planner notebook for tools""" self.prepare_planner_tools(self.planner_mode) if self.in_planner_mode: self._update_toolkit_and_sys_prompt()