# -*- coding: utf-8 -*- """Deep Research Agent""" # pylint: disable=too-many-lines, no-name-in-module import asyncio import json import os from copy import deepcopy from datetime import datetime from typing import Any, Optional, Tuple, Type import shortuuid from agentscope import logger, setup_logger from agentscope.agent import ReActAgent from agentscope.formatter import FormatterBase from agentscope.mcp import StatefulClientBase from agentscope.memory import MemoryBase from agentscope.message import Msg, TextBlock, ToolResultBlock, ToolUseBlock from agentscope.model import ChatModelBase from agentscope.tool import ToolResponse, view_text_file, write_text_file from pydantic import BaseModel from ..agent_deep_research.utils import ( get_dynamic_tool_call_json, get_structure_output, load_prompt_dict, truncate_search_result, ) from .built_in_prompt.promptmodule import ( FollowupJudge, ReflectFailure, SubtasksDecomposition, WebExtraction, ) _DEEP_RESEARCH_AGENT_DEFAULT_SYS_PROMPT = "You're a helpful assistant." _LOG_DIR = os.path.join(os.path.dirname(__file__), "log") _LOG_PATH = os.path.join( _LOG_DIR, f"log_{datetime.now().strftime('%y%m%d%H%M%S')}.md", ) os.makedirs(_LOG_DIR, exist_ok=True) setup_logger(level="INFO", filepath=_LOG_PATH) class SubTaskItem(BaseModel): """Subtask item of deep research agent.""" objective: str working_plan: Optional[str] = None knowledge_gaps: Optional[str] = None class DeepResearchAgent(ReActAgent): """ Deep Research Agent for sophisticated research tasks. Example: .. code-block:: python agent = DeepResearchAgent( name="Friday", sys_prompt="You are a helpful assistant named Friday.", model=my_chat_model, formatter=my_chat_formatter, memory=InMemoryMemory(), search_mcp_client=my_tavily_search_client, tmp_file_storage_dir=agent_working_dir, ) response = await agent( Msg( name=“user”, content="Please give me a survey of the LLM-empowered agent.", role=“user” ) ) ``` """ def __init__( self, name: str, model: ChatModelBase, formatter: FormatterBase, memory: MemoryBase, search_mcp_client: StatefulClientBase, sys_prompt: str = _DEEP_RESEARCH_AGENT_DEFAULT_SYS_PROMPT, max_iters: int = 30, max_depth: int = 3, tmp_file_storage_dir: str = "tmp", ) -> None: """Initialize the Deep Research Agent. Args: name (str): The unique identifier name for the agent instance. model (ChatModelBase): The chat model used for generating responses and reasoning. formatter (FormatterBase): The formatter used to convert messages into the required format for the model API. memory (MemoryBase): The memory component used to store and retrieve dialogue history. search_mcp_client (StatefulClientBase): The mcp client used to provide the tools for deep search. sys_prompt (str, optional): The system prompt that defines the agent's behavior and personality. Defaults to _DEEP_RESEARCH_AGENT_DEFAULT_SYS_PROMPT. max_iters (int, optional): The maximum number of reasoning-acting loop iterations. Defaults to 30. max_depth (int, optional): The maximum depth of query expansion during deep searching. Defaults to 3. tmp_file_storage_dir (str, optional): The storage dir for generated files. Default to 'tmp' Returns: None """ # initialization of prompts self.prompt_dict = load_prompt_dict() # Enhance the system prompt for deep research agent add_note = self.prompt_dict["add_note"].format_map( {"finish_function_name": f"`{self.finish_function_name}`"}, ) tool_use_rule = self.prompt_dict["tool_use_rule"].format_map( {"tmp_file_storage_dir": tmp_file_storage_dir}, ) sys_prompt = f"{sys_prompt}\n{add_note}\n{tool_use_rule}" super().__init__( name=name, sys_prompt=sys_prompt, model=model, formatter=formatter, memory=memory, max_iters=max_iters, ) self.max_depth = max_depth self.memory = memory self.tmp_file_storage_dir = tmp_file_storage_dir self.current_subtask = [] # register all necessary tools for deep research agent self.toolkit.register_tool_function(view_text_file) self.toolkit.register_tool_function(write_text_file) asyncio.create_task( self.toolkit.register_mcp_client(search_mcp_client), ) self.search_function = "tavily-search" self.extract_function = "tavily-extract" self.read_file_function = "view_text_file" self.write_file_function = "write_text_file" self.summarize_function = "summarize_intermediate_results" self.intermediate_memory = [] self.report_path_based = self.name + datetime.now().strftime( "%y%m%d%H%M%S", ) self.report_index = 1 self._required_structured_model = None self.user_query = None # add functions into toolkit self.toolkit.register_tool_function(self.reflect_failure) self.toolkit.register_tool_function( self.summarize_intermediate_results, ) async def reply( self, msg: Msg | list[Msg] | None = None, structured_model: Type[BaseModel] | None = None, ) -> Msg: """The reply method of the agent.""" # Maintain the subtask list self.user_query = msg.get_text_content() self.current_subtask.append( SubTaskItem(objective=self.user_query), ) # Identify the expected output and generate a meta_planner_agent await self.decompose_and_expand_subtask() msg.content += ( f"\nExpected Output:\n{self.current_subtask[0].knowledge_gaps}" ) # Add user query message to memory await self.memory.add(msg) # type: ignore # Record structured output model if provided if structured_model: self._required_structured_model = structured_model self.toolkit.set_extended_model( self.finish_function_name, structured_model, ) for _ in range(self.max_iters): # Generate the working meta_planner_agent first if not self.current_subtask[-1].working_plan: await self.decompose_and_expand_subtask() # Write the instruction for reasoning cur_plan = self.current_subtask[-1].working_plan cur_know_gap = self.current_subtask[-1].knowledge_gaps reasoning_prompt = self.prompt_dict["reasoning_prompt"].format_map( { "objective": self.current_subtask[-1].objective, "meta_planner_agent": ( cur_plan if cur_plan else "There is no working meta_planner_agent now." ), "knowledge_gap": ( f"## Knowledge Gaps:\n {cur_know_gap}" if cur_know_gap else "" ), "depth": len(self.current_subtask), }, ) reasoning_prompt_msg = Msg( "user", content=[ TextBlock( type="text", text=reasoning_prompt, ), ], role="user", ) self.intermediate_memory.append(reasoning_prompt_msg) # Reasoning to generate tool calls backup_memory = deepcopy(self.memory) # type: ignore await self.memory.add(self.intermediate_memory) # type: ignore msg_reasoning = await self._reasoning() self.memory = backup_memory # Calling the tools for tool_call in msg_reasoning.get_content_blocks("tool_use"): self.intermediate_memory.append( Msg( self.name, content=[tool_call], role="assistant", ), ) # add tool_use memory msg_response = await self._acting(tool_call) if msg_response: await self.memory.add(msg_response) self.current_subtask = [] return msg_response # When the maximum iterations are reached, summarize all the findings return await self._summarizing() async def _acting(self, tool_call: ToolUseBlock) -> Msg | None: """ Execute a tool call and process its response with browser-specific handling. Args: tool_call (ToolUseBlock): The tool use block containing the tool name, parameters, and unique identifier for execution. Returns: Msg | None: Returns a response message if the finish function is called successfully, otherwise returns None to continue the reasoning-acting loop. """ tool_res_msg = Msg( "system", [ ToolResultBlock( type="tool_result", id=tool_call["id"], name=tool_call["name"], output=[], ), ], "system", ) update_memory = False intermediate_report = "" chunk = "" try: # Execute the tool call tool_res = await self.toolkit.call_tool_function(tool_call) # Async generator handling async for chunk in tool_res: # Turn into a tool result block tool_res_msg.content[0]["output"] = chunk.content # type: ignore[index] # Skip the printing of the finish function call if ( tool_call["name"] != self.finish_function_name or tool_call["name"] == self.finish_function_name and not chunk.metadata.get("success") ): await self.print(tool_res_msg, chunk.is_last) # Return message if generate_response is called successfully if tool_call[ "name" ] == self.finish_function_name and chunk.metadata.get( "success", True, ): if len(self.current_subtask) == 0: return chunk.metadata.get("response_msg") # Summarize intermediate results into a draft report elif tool_call["name"] == self.summarize_function: self.intermediate_memory = [] await self.memory.add( Msg( "assistant", [ TextBlock( type="text", text=chunk.content[0]["text"], ), ], "assistant", ), ) # Truncate the web extract results that exceeds max length elif tool_call["name"] in [ self.search_function, self.extract_function, ]: tool_res_msg.content[0]["output"] = truncate_search_result( tool_res_msg.content[0]["output"], ) # Update memory when an intermediate report is generated if isinstance(chunk.metadata, dict) and chunk.metadata.get( "update_memory", ): update_memory = True intermediate_report = chunk.metadata.get( "intermediate_report", ) return None finally: # Record the tool result message in the intermediate memory if tool_call["name"] != self.summarize_function: self.intermediate_memory.append(tool_res_msg) # Read more information from the web page if necessary if tool_call["name"] == self.search_function: extract_res = await self._follow_up(chunk.content, tool_call) if isinstance( extract_res.metadata, dict, ) and extract_res.metadata.get("update_memory"): self.intermediate_memory = [] await self.memory.add( Msg( "assistant", content=[ TextBlock( type="text", text=extract_res.metadata.get( "intermediate_report", ).content[0]["text"], ), ], role="assistant", ), ) # Update memory with the intermediate report if update_memory: self.intermediate_memory = [] await self.memory.add( Msg( "assistant", content=[ TextBlock( type="text", text=intermediate_report.content[0]["text"], ), ], role="assistant", ), ) async def get_model_output( self, msgs: list, format_template: Type[BaseModel] = None, stream: bool = True, ) -> Any: """ Call the model and get output with or without a structured format. Args: msgs (list): A list of messages. format_template (BaseModel): structured format. stream (bool): stream-style output. """ blocks = None if format_template: res = await self.model( await self.formatter.format(msgs=msgs), tools=get_dynamic_tool_call_json( format_template, ), ) if stream: async for content_chunk in res: blocks = content_chunk.content else: blocks = res.content return get_structure_output(blocks) else: res = await self.model( await self.formatter.format(msgs=msgs), ) if stream: async for content_chunk in res: blocks = content_chunk.content else: blocks = res.content return blocks async def call_specific_tool( self, func_name: str, params: dict = None, ) -> Tuple[Msg, Msg]: """ Call the specific tool in toolkit. Args: func_name (str): name of the tool. params (dict): input parameters of the tool. """ tool_call = ToolUseBlock( id=shortuuid.uuid(), type="tool_use", name=func_name, input=params, ) tool_call_msg = Msg( "assistant", [tool_call], role="assistant", ) # get tool acting res tool_res_msg = Msg( "system", [ ToolResultBlock( type="tool_result", id=tool_call["id"], name=tool_call["name"], output=[], ), ], "system", ) tool_res = await self.toolkit.call_tool_function( tool_call, ) async for chunk in tool_res: tool_res_msg.content[0]["output"] = chunk.content return tool_call_msg, tool_res_msg async def decompose_and_expand_subtask(self) -> ToolResponse: """Identify the knowledge gaps of the current subtask and generate a working meta_planner_agent by subtask decomposition. The working meta_planner_agent includes necessary steps for task completion and expanded steps. Returns: ToolResponse: The knowledge gaps and working meta_planner_agent of the current subtask in JSON format. """ if len(self.current_subtask) <= self.max_depth: decompose_sys_prompt = self.prompt_dict["decompose_sys_prompt"] previous_plan = "" for i, subtask in enumerate(self.current_subtask): previous_plan += ( f"The {i}-th meta_planner_agent: {subtask.working_plan}\n" ) previous_plan_inst = self.prompt_dict[ "previous_plan_inst" ].format_map( { "previous_plan": previous_plan, "objective": self.current_subtask[-1].objective, }, ) try: gaps_and_plan = await self.get_model_output( msgs=[ Msg("system", decompose_sys_prompt, "system"), Msg("user", previous_plan_inst, "user"), ], format_template=SubtasksDecomposition, stream=self.model.stream, ) response = json.dumps( gaps_and_plan, indent=2, ensure_ascii=False, ) except Exception: # noqa: F841 gaps_and_plan = {} response = self.prompt_dict["retry_hint"].format_map( {"state": "decomposing the subtask"}, ) self.current_subtask[-1].knowledge_gaps = gaps_and_plan.get( "knowledge_gaps", None, ) self.current_subtask[-1].working_plan = gaps_and_plan.get( "working_plan", None, ) return ToolResponse( content=[ TextBlock( type="text", text=response, ), ], ) return ToolResponse( content=[ TextBlock( type="text", text=self.prompt_dict["max_depth_hint"], ), ], ) async def _follow_up( self, search_results: list | str, tool_call: ToolUseBlock, ) -> ToolResponse: """Read the website more intensively to mine more information for the task. And generate a follow-up subtask if necessary to perform deep search. """ if len(self.current_subtask) < self.max_depth: # Step#1: query expansion expansion_sys_prompt = self.prompt_dict["expansion_sys_prompt"] expansion_inst = self.prompt_dict["expansion_inst"].format_map( { "objective": tool_call["input"].get("query", ""), "checklist": self.current_subtask[0].knowledge_gaps, "knowledge_gaps": self.current_subtask[-1].working_plan, "search_results": search_results, }, ) try: follow_up_subtask = await self.get_model_output( msgs=[ Msg("system", expansion_sys_prompt, "system"), Msg("user", expansion_inst, "user"), ], format_template=WebExtraction, stream=self.model.stream, ) except Exception: # noqa: F841 follow_up_subtask = {} # Step #2: extract the url if follow_up_subtask.get("need_more_information", False): expansion_response_msg = Msg( "assistant", follow_up_subtask.get( "reasoning", "I need more information.", ), role="assistant", ) urls = follow_up_subtask.get("url", None) logger.info("Reading %s", urls) # call the extract_function params = { "urls": urls, "extract_depth": "basic", } ( extract_tool_use_msg, extract_tool_res_msg, ) = await self.call_specific_tool( func_name=self.extract_function, params=params, ) self.intermediate_memory.append(extract_tool_use_msg) extract_tool_res_msg.content[0][ "output" ] = truncate_search_result( extract_tool_res_msg.content[0]["output"], ) # await self.memory.add(tool_res_msg) await self.print(extract_tool_res_msg, True) self.intermediate_memory.append(extract_tool_res_msg) # Step #4: follow up judge try: follow_up_response = await self.get_model_output( msgs=[ Msg("user", expansion_inst, "user"), expansion_response_msg, extract_tool_use_msg, extract_tool_res_msg, Msg( "user", self.prompt_dict["follow_up_judge_sys_prompt"], role="user", ), ], format_template=FollowupJudge, stream=self.model.stream, ) except Exception: # noqa: F841 follow_up_response = {} if not follow_up_response.get("is_sufficient", True): subtasks = follow_up_subtask.get("subtask", None) logger.info("Figuring out %s", subtasks) intermediate_report = ( await self.summarize_intermediate_results() ) self.current_subtask.append( SubTaskItem(objective=subtasks), ) return ToolResponse( content=[ TextBlock( type="text", text=follow_up_response.get( "reasoning", self.prompt_dict["need_deeper_hint"], ), ), ], metadata={ "update_memory": True, "intermediate_report": intermediate_report, }, ) else: return ToolResponse( content=[ TextBlock( type="text", text=follow_up_response.get( "reasoning", self.prompt_dict["sufficient_hint"], ), ), ], ) else: return ToolResponse( content=[ TextBlock( type="text", text=follow_up_subtask.get( "reasoning", self.prompt_dict["sufficient_hint"], ), ), ], ) else: return ToolResponse( content=[ TextBlock( type="text", text=self.prompt_dict["max_depth_hint"], ), ], ) async def summarize_intermediate_results(self) -> ToolResponse: """Summarize the intermediate results into a report when a step in working meta_planner_agent is completed. Returns: ToolResponse: The summarized draft report. """ if len(self.intermediate_memory) == 0: return ToolResponse( content=[ TextBlock( type="text", text=self.prompt_dict["no_result_hint"], ), ], ) # agent actively call this tool if self.intermediate_memory[-1].name == self.summarize_function: blocks = await self.get_model_output( msgs=self.intermediate_memory + [ Msg( "user", self.prompt_dict["summarize_hint"].format_map( { "meta_planner_agent": self.current_subtask[ -1 ].working_plan, }, ), role="user", ), ], stream=self.model.stream, ) self.current_subtask[-1].working_plan = blocks[0][ "text" ] # type: ignore[index] report_prefix = "#" * len(self.current_subtask) summarize_sys_prompt = self.prompt_dict[ "summarize_sys_prompt" ].format_map( {"report_prefix": report_prefix}, ) # get all tool result tool_result = "" for item in self.intermediate_memory: if isinstance(item.content, str): tool_result += item.content + "\n" elif isinstance(item.content, list): for each in item.content: if each["type"] == "tool_result": tool_result += str(each) + "\n" else: logger.warning( "Unknown content type: %s!", type(item.content), ) continue summarize_instruction = self.prompt_dict["summarize_inst"].format_map( { "objective": self.current_subtask[0].objective, "knowledge_gaps": self.current_subtask[0].knowledge_gaps, "working_plan": self.current_subtask[-1].working_plan, "tool_result": tool_result, }, ) blocks = await self.get_model_output( msgs=[ Msg("system", summarize_sys_prompt, "system"), Msg("user", summarize_instruction, "user"), ], stream=self.model.stream, ) intermediate_report = blocks[0]["text"] # type: ignore[index] # Write the intermediate report intermediate_report_path = os.path.join( self.tmp_file_storage_dir, f"{self.report_path_based}_" f"inprocess_report_{self.report_index}.md", ) self.report_index += 1 params = { "file_path": intermediate_report_path, "content": intermediate_report, } await self.call_specific_tool( func_name=self.write_file_function, params=params, ) logger.info( "Storing the intermediate findings: %s", intermediate_report, ) if ( self.intermediate_memory[-1].has_content_blocks("tool_use") and self.intermediate_memory[-1].get_content_blocks("tool_use")[0][ "name" ] == self.summarize_function ): return ToolResponse( content=[ TextBlock( type="text", text=self.prompt_dict["update_report_hint"].format_map( { "intermediate_report": intermediate_report, "report_path": intermediate_report_path, }, ), ), ], ) else: return ToolResponse( content=[ TextBlock( type="text", text=self.prompt_dict["save_report_hint"].format_map( { "intermediate_report": intermediate_report, }, ), ), ], ) async def _generate_deepresearch_report( self, checklist: str, ) -> Tuple[Msg, str]: """Collect and polish all draft reports into a final report. Args: checklist (`str`): The expected output items of the original task. """ reporting_sys_prompt = self.prompt_dict["reporting_sys_prompt"] reporting_sys_prompt.format_map( { "original_task": self.user_query, "checklist": checklist, }, ) # Collect all intermediate reports if self.report_index > 1: inprocess_report = "" for index in range(self.report_index): params = { "file_path": os.path.join( self.tmp_file_storage_dir, f"{self.report_path_based}_" f"inprocess_report_{index + 1}.md", ), } _, read_draft_tool_res_msg = await self.call_specific_tool( func_name=self.read_file_function, params=params, ) inprocess_report += ( read_draft_tool_res_msg.content[0]["output"][0]["text"] + "\n" ) msgs = [ Msg( "system", content=reporting_sys_prompt, role="system", ), Msg( "user", content=f"Draft report:\n{inprocess_report}", role="user", ), ] else: # Use only intermediate memory to generate report msgs = [ Msg( "system", content=reporting_sys_prompt, role="system", ), ] + self.intermediate_memory blocks = await self.get_model_output( msgs=msgs, stream=self.model.stream, ) final_report_content = blocks[0]["text"] # type: ignore[index] logger.info( "The final Report is generated: %s", final_report_content, ) # Write the final report into a file detailed_report_path = os.path.join( self.tmp_file_storage_dir, f"{self.report_path_based}_detailed_report.md", ) params = { "file_path": detailed_report_path, "content": final_report_content, } _, write_report_tool_res_msg = await self.call_specific_tool( func_name=self.write_file_function, params=params, ) return write_report_tool_res_msg, detailed_report_path async def _summarizing(self) -> Msg: """Generate a report based on the exsisting findings when the agent fails to solve the problem in the maximum iterations.""" ( summarized_content, _, ) = await self._generate_deepresearch_report( checklist=self.current_subtask[0].knowledge_gaps, ) return Msg( name=self.name, role="assistant", content=json.dumps( summarized_content.content[0]["output"][0], indent=2, ensure_ascii=False, ), ) async def reflect_failure(self) -> ToolResponse: """Reflect on the failure of the action and determine to rephrase the meta_planner_agent or deeper decompose the current step. Returns: ToolResponse: The reflection about meta_planner_agent rephrasing and subtask decomposition. """ reflect_sys_prompt = self.prompt_dict["reflect_sys_prompt"] conversation_history = "" for msg in self.intermediate_memory: conversation_history += ( json.dumps( {"role": "user", "content": msg.content}, ensure_ascii=False, indent=2, ) + "\n" ) reflect_inst = self.prompt_dict["reflect_instruction"].format_map( { "conversation_history": conversation_history, "meta_planner_agent": self.current_subtask[-1].working_plan, }, ) try: reflection = await self.get_model_output( msgs=[ Msg("system", reflect_sys_prompt, "system"), Msg("user", reflect_inst, "user"), ], format_template=ReflectFailure, stream=self.model.stream, ) response = json.dumps( reflection, indent=2, ensure_ascii=False, ) except Exception: # noqa: F841 reflection = {} response = self.prompt_dict["retry_hint"].format_map( {"state": "making the reflection"}, ) if reflection.get("rephrase_subtask", False) and reflection[ "rephrase_subtask" ].get( "need_rephrase", False, ): # type: ignore[index] self.current_subtask[-1].working_plan = reflection[ "rephrase_subtask" ][ "rephrased_plan" ] # type: ignore[index] elif reflection.get("decompose_subtask", False) and reflection[ "decompose_subtask" ].get( "need_decompose", False, ): # type: ignore[index] if len(self.current_subtask) <= self.max_depth: intermediate_report = ( await self.summarize_intermediate_results() ) self.current_subtask.append( SubTaskItem( objective=reflection[ "decompose_subtask" ].get( # type: ignore[index] "failed_subtask", None, ), ), ) return ToolResponse( content=[ TextBlock( type="text", text=response, ), ], metadata={ "update_memory": True, "intermediate_report": intermediate_report, }, ) else: return ToolResponse( content=[ TextBlock( type="text", text=self.prompt_dict["max_depth_hint"], ), ], ) else: pass return ToolResponse( content=[ TextBlock( type="text", text=response, ), ], ) # pylint: disable=invalid-overridden-method, unused-argument async def generate_response( # self, response: str, **_kwargs: Any, ) -> ToolResponse: """Generate a detailed report as a response. Besides, when calling this function, the reasoning-acting memory will be cleared, so your response should contain a brief summary of what you have done so far. Args: response (`str`): Your response to the user. """ checklist = self.current_subtask[0].knowledge_gaps completed_subtask = self.current_subtask.pop() if len(self.current_subtask) == 0: ( summarized_content, _, ) = await self._generate_deepresearch_report( checklist=checklist, ) response_msg = Msg( name=self.name, role="assistant", content=json.dumps( summarized_content.content[0]["output"][0], indent=2, ensure_ascii=False, ), ) return ToolResponse( content=[ TextBlock( type="text", text="Successfully generated detailed report.", ), ], metadata={ "success": True, "response_msg": response_msg, }, is_last=True, ) else: return ToolResponse( content=[ TextBlock( type="text", text=self.prompt_dict[ "subtask_complete_hint" ].format_map( { "cur_obj": completed_subtask.objective, "next_obj": self.current_subtask[-1].objective, }, ), ), ], metadata={ "success": True, }, is_last=True, )