Files
evotraders/deep_research/agent_deep_research/utils.py
2025-11-07 10:00:43 +08:00

327 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""The utilities for deep research agent"""
import os
import json
import re
from typing import Union, Sequence, Any, Type
from pydantic import BaseModel
from agentscope.tool import Toolkit, ToolResponse
TOOL_RESULTS_MAX_WORDS = 5000
def get_prompt_from_file(
file_path: str,
return_json: bool,
) -> Union[str, dict]:
"""Get prompt from file"""
with open(os.path.join(file_path), "r", encoding="utf-8") as f:
if return_json:
prompt = json.load(f)
else:
prompt = f.read()
return prompt
def truncate_by_words(sentence: str) -> str:
"""Truncate too long sentences by words number"""
words = re.findall(
r"\w+|[^\w\s]",
sentence,
re.UNICODE,
)
word_count = 0
result = []
for word in words:
if re.match(r"\w+", word):
word_count += 1
if word_count > TOOL_RESULTS_MAX_WORDS:
break
result.append(word)
truncated_sentence = ""
for i, word in enumerate(result):
if i == 0:
truncated_sentence += word
elif re.match(r"\w+", word):
truncated_sentence += " " + word
else:
truncated_sentence += word
return truncated_sentence
def truncate_search_result(
res: list,
search_func: str = "tavily-search",
extract_function: str = "tavily-extract",
) -> list:
"""Truncate search result in deep research agent"""
if search_func != "tavily-search" or extract_function != "tavily-extract":
raise NotImplementedError(
"Specific implementation of truncation should be provided.",
)
for i, val in enumerate(res):
res[i]["text"] = truncate_by_words(val["text"])
return res
def generate_structure_output(**kwargs: Any) -> ToolResponse:
"""Generate a structured output tool response.
This function is designed to be used as a tool function for generating
structured outputs. It takes arbitrary keyword arguments and wraps them
in a ToolResponse with metadata.
Args:
**kwargs: Arbitrary keyword arguments that should match the format
of the expected structured output specification.
Returns:
ToolResponse: A tool response object with empty content and the
provided kwargs as metadata.
Note:
The input parameters should be in the same format as the specification
and include as much detail as requested by the calling context.
"""
return ToolResponse(content=[], metadata=kwargs)
def get_dynamic_tool_call_json(data_model_type: Type[BaseModel]) -> list[dict]:
"""Generate JSON schema for dynamic tool calling with a given data model.
Creates a temporary toolkit, registers the structure output function,
and configures it with the specified data model to generate appropriate
JSON schemas for tool calling.
Args:
data_model_type: A Pydantic BaseModel class that defines the expected
structure of the tool output.
Returns:
A list of dictionary that contains the JSON schemas for
the configured tool, suitable for use in API calls that
support structured outputs.
Example:
class MyModel(BaseModel):
name: str
value: int
schema = get_dynamic_tool_call_json(MyModel)
"""
tmp_toolkit = Toolkit()
tmp_toolkit.register_tool_function(generate_structure_output)
tmp_toolkit.set_extended_model(
"generate_structure_output",
data_model_type,
)
return tmp_toolkit.get_json_schemas()
def get_structure_output(blocks: list | Sequence) -> dict:
"""Extract structured output from a sequence of blocks.
Processes a list or sequence of blocks to extract tool use outputs
and combine them into a single dictionary. This is typically used
to parse responses from language models that include tool calls.
Args:
blocks: A list or sequence of blocks that may contain tool use
information. Each block should be a dictionary with 'type'
and 'input' keys for tool use blocks.
Returns:
A dictionary containing the combined input data from all tool
use blocks found in the input sequence.
Example:
blocks = [
{"type": "tool_use", "input": {"name": "test"}},
{"type": "text", "content": "Some text"},
{"type": "tool_use", "input": {"value": 42}}
]
result = PromptBase.get_structure_output(blocks)
# result: {"name": "test", "value": 42}
"""
dict_output = {}
for block in blocks:
if isinstance(block, dict) and block.get("type") == "tool_use":
dict_output.update(block.get("input", {}))
return dict_output
def load_prompt_dict() -> dict:
"""Load prompt into dict"""
prompt_dict = {}
cur_dir = os.path.dirname(os.path.abspath(__file__))
prompt_dict["add_note"] = get_prompt_from_file(
file_path=os.path.join(
cur_dir,
"built_in_prompt/prompt_worker_additional_sys_prompt.md",
),
return_json=False,
)
prompt_dict["tool_use_rule"] = get_prompt_from_file(
file_path=os.path.join(
cur_dir,
"built_in_prompt/prompt_tool_usage_rules.md",
),
return_json=False,
)
prompt_dict["decompose_sys_prompt"] = get_prompt_from_file(
file_path=os.path.join(
cur_dir,
"built_in_prompt/prompt_decompose_subtask.md",
),
return_json=False,
)
prompt_dict["expansion_sys_prompt"] = get_prompt_from_file(
file_path=os.path.join(
cur_dir,
"built_in_prompt/prompt_deeper_expansion.md",
),
return_json=False,
)
prompt_dict["summarize_sys_prompt"] = get_prompt_from_file(
file_path=os.path.join(
cur_dir,
"built_in_prompt/prompt_inprocess_report.md",
),
return_json=False,
)
prompt_dict["reporting_sys_prompt"] = get_prompt_from_file(
file_path=os.path.join(
cur_dir,
"built_in_prompt/prompt_deepresearch_summary_report.md",
),
return_json=False,
)
prompt_dict["reflect_sys_prompt"] = get_prompt_from_file(
file_path=os.path.join(
cur_dir,
"built_in_prompt/prompt_reflect_failure.md",
),
return_json=False,
)
prompt_dict["reasoning_prompt"] = (
"## Current Subtask:\n{objective}\n"
"## Working Plan:\n{plan}\n"
"{knowledge_gap}\n"
"## Research Depth:\n{depth}"
)
prompt_dict["previous_plan_inst"] = (
"## Previous Plan:\n{previous_plan}\n"
"## Current Subtask:\n{objective}\n"
)
prompt_dict["max_depth_hint"] = (
"The search depth has reached the maximum limit. So the "
"current subtask can not be further decomposed and "
"expanded anymore. I need to find another way to get it "
"done no matter what."
)
prompt_dict["expansion_inst"] = (
"Review the web search results and identify whether "
"there is any information that can potentially help address "
"checklist items or fulfill knowledge gaps of the task, "
"but whose content is limited or only briefly mentioned.\n"
"**Task Description:**\n{objective}\n"
"**Checklist:**\n{checklist}\n"
"**Knowledge Gaps:**\n{knowledge_gaps}\n"
"**Search Results:**\n{search_results}\n"
"**Output:**\n"
)
prompt_dict["follow_up_judge_sys_prompt"] = (
"To provide sufficient external information for the user's "
"query, you have conducted a web search to obtain additional "
"data. However, you found that some of the information, while "
"important, was insufficient. Consequently, you extracted the "
"entire content from one of the URLs to gather more "
"comprehensive information. Now, you must rigorously and "
"carefully assess whether, after both the web search and "
"extraction process, the information content is adequate to "
"address the given task. Be aware that any arbitrary decisions "
"may result in unnecessary and unacceptable time costs.\n"
)
prompt_dict[
"retry_hint"
] = "Something went wrong when {state}. I need to retry."
prompt_dict["need_deeper_hint"] = (
"The information is insufficient and I need to make deeper "
"research to fill the knowledge gap."
)
prompt_dict[
"sufficient_hint"
] = "The information after web search and extraction is sufficient enough!"
prompt_dict["no_result_hint"] = (
"I mistakenly called the `summarize_intermediate_results` tool as "
"there exists no milestone result to summarize now."
)
prompt_dict["summarize_hint"] = (
"Based on your work history above, examine which step in the "
"following working plan has been completed. Mark the completed "
"step with [DONE] at the end of its line (e.g., k. step k [DONE]) "
"and leave the uncompleted steps unchanged. You MUST return only "
"the updated plan, preserving exactly the same format as the "
"original plan. Do not include any explanations, reasoning, "
"or section headers such as '## Working Plan:', just output the"
"updated plan itself."
"\n\n## Working Plan:\n{plan}"
)
prompt_dict["summarize_inst"] = (
"**Task Description:**\n{objective}\n"
"**Checklist:**\n{knowledge_gaps}\n"
"**Knowledge Gaps:**\n{working_plan}\n"
"**Search Results:**\n{tool_result}"
)
prompt_dict["update_report_hint"] = (
"Due to the overwhelming quantity of information, I have replaced the "
"original bulk search results from the research phase with the "
"following report that consolidates and summarizes the essential "
"findings:\n {intermediate_report}\n\n"
"Such report has been saved to the {report_path}. "
"I will now **proceed to the next item** in the working plan."
)
prompt_dict["save_report_hint"] = (
"The milestone results of the current item in working plan "
"are summarized into the following report:\n{intermediate_report}"
)
prompt_dict["reflect_instruction"] = (
"## Work History:\n{conversation_history}\n"
"## Working Plan:\n{plan}\n"
)
prompt_dict["subtask_complete_hint"] = (
"Subtask {cur_obj} is completed. Now the current subtask "
"fallbacks to '{next_obj}'"
)
return prompt_dict