Files
evotraders/deep_research/agent_deep_research/deep_research_agent.py
raykkk 7d0451131f init
2025-10-17 21:40:45 +08:00

1120 lines
39 KiB
Python

# -*- coding: utf-8 -*-
"""Deep Research Agent"""
# pylint: disable=too-many-lines, no-name-in-module
import asyncio
import json
import os
from copy import deepcopy
from datetime import datetime
from typing import Any, Optional, Tuple, Type
import shortuuid
from agentscope import logger, setup_logger
from agentscope.agent import ReActAgent
from agentscope.formatter import FormatterBase
from agentscope.mcp import StatefulClientBase
from agentscope.memory import MemoryBase
from agentscope.message import Msg, TextBlock, ToolResultBlock, ToolUseBlock
from agentscope.model import ChatModelBase
from agentscope.tool import ToolResponse, view_text_file, write_text_file
from pydantic import BaseModel
from ..agent_deep_research.utils import (
get_dynamic_tool_call_json,
get_structure_output,
load_prompt_dict,
truncate_search_result,
)
from .built_in_prompt.promptmodule import (
FollowupJudge,
ReflectFailure,
SubtasksDecomposition,
WebExtraction,
)
_DEEP_RESEARCH_AGENT_DEFAULT_SYS_PROMPT = "You're a helpful assistant."
_LOG_DIR = os.path.join(os.path.dirname(__file__), "log")
_LOG_PATH = os.path.join(
_LOG_DIR,
f"log_{datetime.now().strftime('%y%m%d%H%M%S')}.md",
)
os.makedirs(_LOG_DIR, exist_ok=True)
setup_logger(level="INFO", filepath=_LOG_PATH)
class SubTaskItem(BaseModel):
"""Subtask item of deep research agent."""
objective: str
working_plan: Optional[str] = None
knowledge_gaps: Optional[str] = None
class DeepResearchAgent(ReActAgent):
"""
Deep Research Agent for sophisticated research tasks.
Example:
.. code-block:: python
agent = DeepResearchAgent(
name="Friday",
sys_prompt="You are a helpful assistant named Friday.",
model=my_chat_model,
formatter=my_chat_formatter,
memory=InMemoryMemory(),
search_mcp_client=my_tavily_search_client,
tmp_file_storage_dir=agent_working_dir,
)
response = await agent(
Msg(
name=“user”,
content="Please give me a survey of the LLM-empowered agent.",
role=“user”
)
)
```
"""
def __init__(
self,
name: str,
model: ChatModelBase,
formatter: FormatterBase,
memory: MemoryBase,
search_mcp_client: StatefulClientBase,
sys_prompt: str = _DEEP_RESEARCH_AGENT_DEFAULT_SYS_PROMPT,
max_iters: int = 30,
max_depth: int = 3,
tmp_file_storage_dir: str = "tmp",
) -> None:
"""Initialize the Deep Research Agent.
Args:
name (str):
The unique identifier name for the agent instance.
model (ChatModelBase):
The chat model used for generating responses and reasoning.
formatter (FormatterBase):
The formatter used to convert messages into the required
format for the model API.
memory (MemoryBase):
The memory component used to store and retrieve dialogue
history.
search_mcp_client (StatefulClientBase):
The mcp client used to provide the tools for deep search.
sys_prompt (str, optional):
The system prompt that defines the agent's behavior
and personality.
Defaults to _DEEP_RESEARCH_AGENT_DEFAULT_SYS_PROMPT.
max_iters (int, optional):
The maximum number of reasoning-acting loop iterations.
Defaults to 30.
max_depth (int, optional):
The maximum depth of query expansion during deep searching.
Defaults to 3.
tmp_file_storage_dir (str, optional):
The storage dir for generated files.
Default to 'tmp'
Returns:
None
"""
# initialization of prompts
self.prompt_dict = load_prompt_dict()
# Enhance the system prompt for deep research agent
add_note = self.prompt_dict["add_note"].format_map(
{"finish_function_name": f"`{self.finish_function_name}`"},
)
tool_use_rule = self.prompt_dict["tool_use_rule"].format_map(
{"tmp_file_storage_dir": tmp_file_storage_dir},
)
sys_prompt = f"{sys_prompt}\n{add_note}\n{tool_use_rule}"
super().__init__(
name=name,
sys_prompt=sys_prompt,
model=model,
formatter=formatter,
memory=memory,
max_iters=max_iters,
)
self.max_depth = max_depth
self.memory = memory
self.tmp_file_storage_dir = tmp_file_storage_dir
self.current_subtask = []
# register all necessary tools for deep research agent
self.toolkit.register_tool_function(view_text_file)
self.toolkit.register_tool_function(write_text_file)
asyncio.create_task(
self.toolkit.register_mcp_client(search_mcp_client),
)
self.search_function = "tavily-search"
self.extract_function = "tavily-extract"
self.read_file_function = "view_text_file"
self.write_file_function = "write_text_file"
self.summarize_function = "summarize_intermediate_results"
self.intermediate_memory = []
self.report_path_based = self.name + datetime.now().strftime(
"%y%m%d%H%M%S",
)
self.report_index = 1
self._required_structured_model = None
self.user_query = None
# add functions into toolkit
self.toolkit.register_tool_function(self.reflect_failure)
self.toolkit.register_tool_function(
self.summarize_intermediate_results,
)
async def reply(
self,
msg: Msg | list[Msg] | None = None,
structured_model: Type[BaseModel] | None = None,
) -> Msg:
"""The reply method of the agent."""
# Maintain the subtask list
self.user_query = msg.get_text_content()
self.current_subtask.append(
SubTaskItem(objective=self.user_query),
)
# Identify the expected output and generate a meta_planner_agent
await self.decompose_and_expand_subtask()
msg.content += (
f"\nExpected Output:\n{self.current_subtask[0].knowledge_gaps}"
)
# Add user query message to memory
await self.memory.add(msg) # type: ignore
# Record structured output model if provided
if structured_model:
self._required_structured_model = structured_model
self.toolkit.set_extended_model(
self.finish_function_name,
structured_model,
)
for _ in range(self.max_iters):
# Generate the working meta_planner_agent first
if not self.current_subtask[-1].working_plan:
await self.decompose_and_expand_subtask()
# Write the instruction for reasoning
cur_plan = self.current_subtask[-1].working_plan
cur_know_gap = self.current_subtask[-1].knowledge_gaps
reasoning_prompt = self.prompt_dict["reasoning_prompt"].format_map(
{
"objective": self.current_subtask[-1].objective,
"meta_planner_agent": (
cur_plan
if cur_plan
else "There is no working meta_planner_agent now."
),
"knowledge_gap": (
f"## Knowledge Gaps:\n {cur_know_gap}"
if cur_know_gap
else ""
),
"depth": len(self.current_subtask),
},
)
reasoning_prompt_msg = Msg(
"user",
content=[
TextBlock(
type="text",
text=reasoning_prompt,
),
],
role="user",
)
self.intermediate_memory.append(reasoning_prompt_msg)
# Reasoning to generate tool calls
backup_memory = deepcopy(self.memory) # type: ignore
await self.memory.add(self.intermediate_memory) # type: ignore
msg_reasoning = await self._reasoning()
self.memory = backup_memory
# Calling the tools
for tool_call in msg_reasoning.get_content_blocks("tool_use"):
self.intermediate_memory.append(
Msg(
self.name,
content=[tool_call],
role="assistant",
),
) # add tool_use memory
msg_response = await self._acting(tool_call)
if msg_response:
await self.memory.add(msg_response)
self.current_subtask = []
return msg_response
# When the maximum iterations are reached, summarize all the findings
return await self._summarizing()
async def _acting(self, tool_call: ToolUseBlock) -> Msg | None:
"""
Execute a tool call and process its response with browser-specific
handling.
Args:
tool_call (ToolUseBlock):
The tool use block containing the tool name, parameters,
and unique identifier for execution.
Returns:
Msg | None:
Returns a response message if the finish function is called
successfully, otherwise returns None to continue the
reasoning-acting loop.
"""
tool_res_msg = Msg(
"system",
[
ToolResultBlock(
type="tool_result",
id=tool_call["id"],
name=tool_call["name"],
output=[],
),
],
"system",
)
update_memory = False
intermediate_report = ""
chunk = ""
try:
# Execute the tool call
tool_res = await self.toolkit.call_tool_function(tool_call)
# Async generator handling
async for chunk in tool_res:
# Turn into a tool result block
tool_res_msg.content[0]["output"] = chunk.content # type: ignore[index]
# Skip the printing of the finish function call
if (
tool_call["name"] != self.finish_function_name
or tool_call["name"] == self.finish_function_name
and not chunk.metadata.get("success")
):
await self.print(tool_res_msg, chunk.is_last)
# Return message if generate_response is called successfully
if tool_call[
"name"
] == self.finish_function_name and chunk.metadata.get(
"success",
True,
):
if len(self.current_subtask) == 0:
return chunk.metadata.get("response_msg")
# Summarize intermediate results into a draft report
elif tool_call["name"] == self.summarize_function:
self.intermediate_memory = []
await self.memory.add(
Msg(
"assistant",
[
TextBlock(
type="text",
text=chunk.content[0]["text"],
),
],
"assistant",
),
)
# Truncate the web extract results that exceeds max length
elif tool_call["name"] in [
self.search_function,
self.extract_function,
]:
tool_res_msg.content[0]["output"] = truncate_search_result(
tool_res_msg.content[0]["output"],
)
# Update memory when an intermediate report is generated
if isinstance(chunk.metadata, dict) and chunk.metadata.get(
"update_memory",
):
update_memory = True
intermediate_report = chunk.metadata.get(
"intermediate_report",
)
return None
finally:
# Record the tool result message in the intermediate memory
if tool_call["name"] != self.summarize_function:
self.intermediate_memory.append(tool_res_msg)
# Read more information from the web page if necessary
if tool_call["name"] == self.search_function:
extract_res = await self._follow_up(chunk.content, tool_call)
if isinstance(
extract_res.metadata,
dict,
) and extract_res.metadata.get("update_memory"):
self.intermediate_memory = []
await self.memory.add(
Msg(
"assistant",
content=[
TextBlock(
type="text",
text=extract_res.metadata.get(
"intermediate_report",
).content[0]["text"],
),
],
role="assistant",
),
)
# Update memory with the intermediate report
if update_memory:
self.intermediate_memory = []
await self.memory.add(
Msg(
"assistant",
content=[
TextBlock(
type="text",
text=intermediate_report.content[0]["text"],
),
],
role="assistant",
),
)
async def get_model_output(
self,
msgs: list,
format_template: Type[BaseModel] = None,
stream: bool = True,
) -> Any:
"""
Call the model and get output with or without a structured format.
Args:
msgs (list): A list of messages.
format_template (BaseModel): structured format.
stream (bool): stream-style output.
"""
blocks = None
if format_template:
res = await self.model(
await self.formatter.format(msgs=msgs),
tools=get_dynamic_tool_call_json(
format_template,
),
)
if stream:
async for content_chunk in res:
blocks = content_chunk.content
else:
blocks = res.content
return get_structure_output(blocks)
else:
res = await self.model(
await self.formatter.format(msgs=msgs),
)
if stream:
async for content_chunk in res:
blocks = content_chunk.content
else:
blocks = res.content
return blocks
async def call_specific_tool(
self,
func_name: str,
params: dict = None,
) -> Tuple[Msg, Msg]:
"""
Call the specific tool in toolkit.
Args:
func_name (str): name of the tool.
params (dict): input parameters of the tool.
"""
tool_call = ToolUseBlock(
id=shortuuid.uuid(),
type="tool_use",
name=func_name,
input=params,
)
tool_call_msg = Msg(
"assistant",
[tool_call],
role="assistant",
)
# get tool acting res
tool_res_msg = Msg(
"system",
[
ToolResultBlock(
type="tool_result",
id=tool_call["id"],
name=tool_call["name"],
output=[],
),
],
"system",
)
tool_res = await self.toolkit.call_tool_function(
tool_call,
)
async for chunk in tool_res:
tool_res_msg.content[0]["output"] = chunk.content
return tool_call_msg, tool_res_msg
async def decompose_and_expand_subtask(self) -> ToolResponse:
"""Identify the knowledge gaps of the current subtask and generate a
working meta_planner_agent by subtask decomposition. The working meta_planner_agent includes
necessary steps for task completion and expanded steps.
Returns:
ToolResponse:
The knowledge gaps and working meta_planner_agent of the current subtask
in JSON format.
"""
if len(self.current_subtask) <= self.max_depth:
decompose_sys_prompt = self.prompt_dict["decompose_sys_prompt"]
previous_plan = ""
for i, subtask in enumerate(self.current_subtask):
previous_plan += (
f"The {i}-th meta_planner_agent: {subtask.working_plan}\n"
)
previous_plan_inst = self.prompt_dict[
"previous_plan_inst"
].format_map(
{
"previous_plan": previous_plan,
"objective": self.current_subtask[-1].objective,
},
)
try:
gaps_and_plan = await self.get_model_output(
msgs=[
Msg("system", decompose_sys_prompt, "system"),
Msg("user", previous_plan_inst, "user"),
],
format_template=SubtasksDecomposition,
stream=self.model.stream,
)
response = json.dumps(
gaps_and_plan,
indent=2,
ensure_ascii=False,
)
except Exception: # noqa: F841
gaps_and_plan = {}
response = self.prompt_dict["retry_hint"].format_map(
{"state": "decomposing the subtask"},
)
self.current_subtask[-1].knowledge_gaps = gaps_and_plan.get(
"knowledge_gaps",
None,
)
self.current_subtask[-1].working_plan = gaps_and_plan.get(
"working_plan",
None,
)
return ToolResponse(
content=[
TextBlock(
type="text",
text=response,
),
],
)
return ToolResponse(
content=[
TextBlock(
type="text",
text=self.prompt_dict["max_depth_hint"],
),
],
)
async def _follow_up(
self,
search_results: list | str,
tool_call: ToolUseBlock,
) -> ToolResponse:
"""Read the website more intensively to mine more information for
the task. And generate a follow-up subtask if necessary to perform
deep search.
"""
if len(self.current_subtask) < self.max_depth:
# Step#1: query expansion
expansion_sys_prompt = self.prompt_dict["expansion_sys_prompt"]
expansion_inst = self.prompt_dict["expansion_inst"].format_map(
{
"objective": tool_call["input"].get("query", ""),
"checklist": self.current_subtask[0].knowledge_gaps,
"knowledge_gaps": self.current_subtask[-1].working_plan,
"search_results": search_results,
},
)
try:
follow_up_subtask = await self.get_model_output(
msgs=[
Msg("system", expansion_sys_prompt, "system"),
Msg("user", expansion_inst, "user"),
],
format_template=WebExtraction,
stream=self.model.stream,
)
except Exception: # noqa: F841
follow_up_subtask = {}
# Step #2: extract the url
if follow_up_subtask.get("need_more_information", False):
expansion_response_msg = Msg(
"assistant",
follow_up_subtask.get(
"reasoning",
"I need more information.",
),
role="assistant",
)
urls = follow_up_subtask.get("url", None)
logger.info("Reading %s", urls)
# call the extract_function
params = {
"urls": urls,
"extract_depth": "basic",
}
(
extract_tool_use_msg,
extract_tool_res_msg,
) = await self.call_specific_tool(
func_name=self.extract_function,
params=params,
)
self.intermediate_memory.append(extract_tool_use_msg)
extract_tool_res_msg.content[0][
"output"
] = truncate_search_result(
extract_tool_res_msg.content[0]["output"],
)
# await self.memory.add(tool_res_msg)
await self.print(extract_tool_res_msg, True)
self.intermediate_memory.append(extract_tool_res_msg)
# Step #4: follow up judge
try:
follow_up_response = await self.get_model_output(
msgs=[
Msg("user", expansion_inst, "user"),
expansion_response_msg,
extract_tool_use_msg,
extract_tool_res_msg,
Msg(
"user",
self.prompt_dict["follow_up_judge_sys_prompt"],
role="user",
),
],
format_template=FollowupJudge,
stream=self.model.stream,
)
except Exception: # noqa: F841
follow_up_response = {}
if not follow_up_response.get("is_sufficient", True):
subtasks = follow_up_subtask.get("subtask", None)
logger.info("Figuring out %s", subtasks)
intermediate_report = (
await self.summarize_intermediate_results()
)
self.current_subtask.append(
SubTaskItem(objective=subtasks),
)
return ToolResponse(
content=[
TextBlock(
type="text",
text=follow_up_response.get(
"reasoning",
self.prompt_dict["need_deeper_hint"],
),
),
],
metadata={
"update_memory": True,
"intermediate_report": intermediate_report,
},
)
else:
return ToolResponse(
content=[
TextBlock(
type="text",
text=follow_up_response.get(
"reasoning",
self.prompt_dict["sufficient_hint"],
),
),
],
)
else:
return ToolResponse(
content=[
TextBlock(
type="text",
text=follow_up_subtask.get(
"reasoning",
self.prompt_dict["sufficient_hint"],
),
),
],
)
else:
return ToolResponse(
content=[
TextBlock(
type="text",
text=self.prompt_dict["max_depth_hint"],
),
],
)
async def summarize_intermediate_results(self) -> ToolResponse:
"""Summarize the intermediate results into a report when a step
in working meta_planner_agent is completed.
Returns:
ToolResponse:
The summarized draft report.
"""
if len(self.intermediate_memory) == 0:
return ToolResponse(
content=[
TextBlock(
type="text",
text=self.prompt_dict["no_result_hint"],
),
],
)
# agent actively call this tool
if self.intermediate_memory[-1].name == self.summarize_function:
blocks = await self.get_model_output(
msgs=self.intermediate_memory
+ [
Msg(
"user",
self.prompt_dict["summarize_hint"].format_map(
{
"meta_planner_agent": self.current_subtask[
-1
].working_plan,
},
),
role="user",
),
],
stream=self.model.stream,
)
self.current_subtask[-1].working_plan = blocks[0][
"text"
] # type: ignore[index]
report_prefix = "#" * len(self.current_subtask)
summarize_sys_prompt = self.prompt_dict[
"summarize_sys_prompt"
].format_map(
{"report_prefix": report_prefix},
)
# get all tool result
tool_result = ""
for item in self.intermediate_memory:
if isinstance(item.content, str):
tool_result += item.content + "\n"
elif isinstance(item.content, list):
for each in item.content:
if each["type"] == "tool_result":
tool_result += str(each) + "\n"
else:
logger.warning(
"Unknown content type: %s!",
type(item.content),
)
continue
summarize_instruction = self.prompt_dict["summarize_inst"].format_map(
{
"objective": self.current_subtask[0].objective,
"knowledge_gaps": self.current_subtask[0].knowledge_gaps,
"working_plan": self.current_subtask[-1].working_plan,
"tool_result": tool_result,
},
)
blocks = await self.get_model_output(
msgs=[
Msg("system", summarize_sys_prompt, "system"),
Msg("user", summarize_instruction, "user"),
],
stream=self.model.stream,
)
intermediate_report = blocks[0]["text"] # type: ignore[index]
# Write the intermediate report
intermediate_report_path = os.path.join(
self.tmp_file_storage_dir,
f"{self.report_path_based}_"
f"inprocess_report_{self.report_index}.md",
)
self.report_index += 1
params = {
"file_path": intermediate_report_path,
"content": intermediate_report,
}
await self.call_specific_tool(
func_name=self.write_file_function,
params=params,
)
logger.info(
"Storing the intermediate findings: %s",
intermediate_report,
)
if (
self.intermediate_memory[-1].has_content_blocks("tool_use")
and self.intermediate_memory[-1].get_content_blocks("tool_use")[0][
"name"
]
== self.summarize_function
):
return ToolResponse(
content=[
TextBlock(
type="text",
text=self.prompt_dict["update_report_hint"].format_map(
{
"intermediate_report": intermediate_report,
"report_path": intermediate_report_path,
},
),
),
],
)
else:
return ToolResponse(
content=[
TextBlock(
type="text",
text=self.prompt_dict["save_report_hint"].format_map(
{
"intermediate_report": intermediate_report,
},
),
),
],
)
async def _generate_deepresearch_report(
self,
checklist: str,
) -> Tuple[Msg, str]:
"""Collect and polish all draft reports into a final report.
Args:
checklist (`str`):
The expected output items of the original task.
"""
reporting_sys_prompt = self.prompt_dict["reporting_sys_prompt"]
reporting_sys_prompt.format_map(
{
"original_task": self.user_query,
"checklist": checklist,
},
)
# Collect all intermediate reports
if self.report_index > 1:
inprocess_report = ""
for index in range(self.report_index):
params = {
"file_path": os.path.join(
self.tmp_file_storage_dir,
f"{self.report_path_based}_"
f"inprocess_report_{index + 1}.md",
),
}
_, read_draft_tool_res_msg = await self.call_specific_tool(
func_name=self.read_file_function,
params=params,
)
inprocess_report += (
read_draft_tool_res_msg.content[0]["output"][0]["text"]
+ "\n"
)
msgs = [
Msg(
"system",
content=reporting_sys_prompt,
role="system",
),
Msg(
"user",
content=f"Draft report:\n{inprocess_report}",
role="user",
),
]
else: # Use only intermediate memory to generate report
msgs = [
Msg(
"system",
content=reporting_sys_prompt,
role="system",
),
] + self.intermediate_memory
blocks = await self.get_model_output(
msgs=msgs,
stream=self.model.stream,
)
final_report_content = blocks[0]["text"] # type: ignore[index]
logger.info(
"The final Report is generated: %s",
final_report_content,
)
# Write the final report into a file
detailed_report_path = os.path.join(
self.tmp_file_storage_dir,
f"{self.report_path_based}_detailed_report.md",
)
params = {
"file_path": detailed_report_path,
"content": final_report_content,
}
_, write_report_tool_res_msg = await self.call_specific_tool(
func_name=self.write_file_function,
params=params,
)
return write_report_tool_res_msg, detailed_report_path
async def _summarizing(self) -> Msg:
"""Generate a report based on the exsisting findings when the
agent fails to solve the problem in the maximum iterations."""
(
summarized_content,
_,
) = await self._generate_deepresearch_report(
checklist=self.current_subtask[0].knowledge_gaps,
)
return Msg(
name=self.name,
role="assistant",
content=json.dumps(
summarized_content.content[0]["output"][0],
indent=2,
ensure_ascii=False,
),
)
async def reflect_failure(self) -> ToolResponse:
"""Reflect on the failure of the action and determine to rephrase
the meta_planner_agent or deeper decompose the current step.
Returns:
ToolResponse:
The reflection about meta_planner_agent rephrasing and subtask decomposition.
"""
reflect_sys_prompt = self.prompt_dict["reflect_sys_prompt"]
conversation_history = ""
for msg in self.intermediate_memory:
conversation_history += (
json.dumps(
{"role": "user", "content": msg.content},
ensure_ascii=False,
indent=2,
)
+ "\n"
)
reflect_inst = self.prompt_dict["reflect_instruction"].format_map(
{
"conversation_history": conversation_history,
"meta_planner_agent": self.current_subtask[-1].working_plan,
},
)
try:
reflection = await self.get_model_output(
msgs=[
Msg("system", reflect_sys_prompt, "system"),
Msg("user", reflect_inst, "user"),
],
format_template=ReflectFailure,
stream=self.model.stream,
)
response = json.dumps(
reflection,
indent=2,
ensure_ascii=False,
)
except Exception: # noqa: F841
reflection = {}
response = self.prompt_dict["retry_hint"].format_map(
{"state": "making the reflection"},
)
if reflection.get("rephrase_subtask", False) and reflection[
"rephrase_subtask"
].get(
"need_rephrase",
False,
): # type: ignore[index]
self.current_subtask[-1].working_plan = reflection[
"rephrase_subtask"
][
"rephrased_plan"
] # type: ignore[index]
elif reflection.get("decompose_subtask", False) and reflection[
"decompose_subtask"
].get(
"need_decompose",
False,
): # type: ignore[index]
if len(self.current_subtask) <= self.max_depth:
intermediate_report = (
await self.summarize_intermediate_results()
)
self.current_subtask.append(
SubTaskItem(
objective=reflection[
"decompose_subtask"
].get( # type: ignore[index]
"failed_subtask",
None,
),
),
)
return ToolResponse(
content=[
TextBlock(
type="text",
text=response,
),
],
metadata={
"update_memory": True,
"intermediate_report": intermediate_report,
},
)
else:
return ToolResponse(
content=[
TextBlock(
type="text",
text=self.prompt_dict["max_depth_hint"],
),
],
)
else:
pass
return ToolResponse(
content=[
TextBlock(
type="text",
text=response,
),
],
)
# pylint: disable=invalid-overridden-method, unused-argument
async def generate_response( #
self,
response: str,
**_kwargs: Any,
) -> ToolResponse:
"""Generate a detailed report as a response.
Besides, when calling this function, the reasoning-acting memory will
be cleared, so your response should contain a brief summary of what
you have done so far.
Args:
response (`str`):
Your response to the user.
"""
checklist = self.current_subtask[0].knowledge_gaps
completed_subtask = self.current_subtask.pop()
if len(self.current_subtask) == 0:
(
summarized_content,
_,
) = await self._generate_deepresearch_report(
checklist=checklist,
)
response_msg = Msg(
name=self.name,
role="assistant",
content=json.dumps(
summarized_content.content[0]["output"][0],
indent=2,
ensure_ascii=False,
),
)
return ToolResponse(
content=[
TextBlock(
type="text",
text="Successfully generated detailed report.",
),
],
metadata={
"success": True,
"response_msg": response_msg,
},
is_last=True,
)
else:
return ToolResponse(
content=[
TextBlock(
type="text",
text=self.prompt_dict[
"subtask_complete_hint"
].format_map(
{
"cur_obj": completed_subtask.objective,
"next_obj": self.current_subtask[-1].objective,
},
),
),
],
metadata={
"success": True,
},
is_last=True,
)