327 lines
10 KiB
Python
327 lines
10 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""The utilities for deep research agent"""
|
||
import os
|
||
import json
|
||
import re
|
||
from typing import Union, Sequence, Any, Type
|
||
from pydantic import BaseModel
|
||
|
||
from agentscope.tool import Toolkit, ToolResponse
|
||
|
||
|
||
TOOL_RESULTS_MAX_WORDS = 5000
|
||
|
||
|
||
def get_prompt_from_file(
|
||
file_path: str,
|
||
return_json: bool,
|
||
) -> Union[str, dict]:
|
||
"""Get prompt from file"""
|
||
with open(os.path.join(file_path), "r", encoding="utf-8") as f:
|
||
if return_json:
|
||
prompt = json.load(f)
|
||
else:
|
||
prompt = f.read()
|
||
return prompt
|
||
|
||
|
||
def truncate_by_words(sentence: str) -> str:
|
||
"""Truncate too long sentences by words number"""
|
||
words = re.findall(
|
||
r"\w+|[^\w\s]",
|
||
sentence,
|
||
re.UNICODE,
|
||
)
|
||
|
||
word_count = 0
|
||
result = []
|
||
for word in words:
|
||
if re.match(r"\w+", word):
|
||
word_count += 1
|
||
if word_count > TOOL_RESULTS_MAX_WORDS:
|
||
break
|
||
result.append(word)
|
||
|
||
truncated_sentence = ""
|
||
for i, word in enumerate(result):
|
||
if i == 0:
|
||
truncated_sentence += word
|
||
elif re.match(r"\w+", word):
|
||
truncated_sentence += " " + word
|
||
else:
|
||
truncated_sentence += word
|
||
return truncated_sentence
|
||
|
||
|
||
def truncate_search_result(
|
||
res: list,
|
||
search_func: str = "tavily-search",
|
||
extract_function: str = "tavily-extract",
|
||
) -> list:
|
||
"""Truncate search result in deep research agent"""
|
||
if search_func != "tavily-search" or extract_function != "tavily-extract":
|
||
raise NotImplementedError(
|
||
"Specific implementation of truncation should be provided.",
|
||
)
|
||
|
||
for i, val in enumerate(res):
|
||
res[i]["text"] = truncate_by_words(val["text"])
|
||
|
||
return res
|
||
|
||
|
||
def generate_structure_output(**kwargs: Any) -> ToolResponse:
|
||
"""Generate a structured output tool response.
|
||
|
||
This function is designed to be used as a tool function for generating
|
||
structured outputs. It takes arbitrary keyword arguments and wraps them
|
||
in a ToolResponse with metadata.
|
||
|
||
Args:
|
||
**kwargs: Arbitrary keyword arguments that should match the format
|
||
of the expected structured output specification.
|
||
|
||
Returns:
|
||
ToolResponse: A tool response object with empty content and the
|
||
provided kwargs as metadata.
|
||
|
||
Note:
|
||
The input parameters should be in the same format as the specification
|
||
and include as much detail as requested by the calling context.
|
||
"""
|
||
return ToolResponse(content=[], metadata=kwargs)
|
||
|
||
|
||
def get_dynamic_tool_call_json(data_model_type: Type[BaseModel]) -> list[dict]:
|
||
"""Generate JSON schema for dynamic tool calling with a given data model.
|
||
|
||
Creates a temporary toolkit, registers the structure output function,
|
||
and configures it with the specified data model to generate appropriate
|
||
JSON schemas for tool calling.
|
||
|
||
Args:
|
||
data_model_type: A Pydantic BaseModel class that defines the expected
|
||
structure of the tool output.
|
||
|
||
Returns:
|
||
A list of dictionary that contains the JSON schemas for
|
||
the configured tool, suitable for use in API calls that
|
||
support structured outputs.
|
||
|
||
Example:
|
||
class MyModel(BaseModel):
|
||
name: str
|
||
value: int
|
||
|
||
schema = get_dynamic_tool_call_json(MyModel)
|
||
"""
|
||
tmp_toolkit = Toolkit()
|
||
tmp_toolkit.register_tool_function(generate_structure_output)
|
||
tmp_toolkit.set_extended_model(
|
||
"generate_structure_output",
|
||
data_model_type,
|
||
)
|
||
return tmp_toolkit.get_json_schemas()
|
||
|
||
|
||
def get_structure_output(blocks: list | Sequence) -> dict:
|
||
"""Extract structured output from a sequence of blocks.
|
||
|
||
Processes a list or sequence of blocks to extract tool use outputs
|
||
and combine them into a single dictionary. This is typically used
|
||
to parse responses from language models that include tool calls.
|
||
|
||
Args:
|
||
blocks: A list or sequence of blocks that may contain tool use
|
||
information. Each block should be a dictionary with 'type'
|
||
and 'input' keys for tool use blocks.
|
||
|
||
Returns:
|
||
A dictionary containing the combined input data from all tool
|
||
use blocks found in the input sequence.
|
||
|
||
Example:
|
||
blocks = [
|
||
{"type": "tool_use", "input": {"name": "test"}},
|
||
{"type": "text", "content": "Some text"},
|
||
{"type": "tool_use", "input": {"value": 42}}
|
||
]
|
||
result = PromptBase.get_structure_output(blocks)
|
||
# result: {"name": "test", "value": 42}
|
||
"""
|
||
|
||
dict_output = {}
|
||
for block in blocks:
|
||
if isinstance(block, dict) and block.get("type") == "tool_use":
|
||
dict_output.update(block.get("input", {}))
|
||
return dict_output
|
||
|
||
|
||
def load_prompt_dict() -> dict:
|
||
"""Load prompt into dict"""
|
||
prompt_dict = {}
|
||
cur_dir = os.path.dirname(os.path.abspath(__file__))
|
||
|
||
prompt_dict["add_note"] = get_prompt_from_file(
|
||
file_path=os.path.join(
|
||
cur_dir,
|
||
"built_in_prompt/prompt_worker_additional_sys_prompt.md",
|
||
),
|
||
return_json=False,
|
||
)
|
||
|
||
prompt_dict["tool_use_rule"] = get_prompt_from_file(
|
||
file_path=os.path.join(
|
||
cur_dir,
|
||
"built_in_prompt/prompt_tool_usage_rules.md",
|
||
),
|
||
return_json=False,
|
||
)
|
||
|
||
prompt_dict["decompose_sys_prompt"] = get_prompt_from_file(
|
||
file_path=os.path.join(
|
||
cur_dir,
|
||
"built_in_prompt/prompt_decompose_subtask.md",
|
||
),
|
||
return_json=False,
|
||
)
|
||
|
||
prompt_dict["expansion_sys_prompt"] = get_prompt_from_file(
|
||
file_path=os.path.join(
|
||
cur_dir,
|
||
"built_in_prompt/prompt_deeper_expansion.md",
|
||
),
|
||
return_json=False,
|
||
)
|
||
|
||
prompt_dict["summarize_sys_prompt"] = get_prompt_from_file(
|
||
file_path=os.path.join(
|
||
cur_dir,
|
||
"built_in_prompt/prompt_inprocess_report.md",
|
||
),
|
||
return_json=False,
|
||
)
|
||
|
||
prompt_dict["reporting_sys_prompt"] = get_prompt_from_file(
|
||
file_path=os.path.join(
|
||
cur_dir,
|
||
"built_in_prompt/prompt_deepresearch_summary_report.md",
|
||
),
|
||
return_json=False,
|
||
)
|
||
|
||
prompt_dict["reflect_sys_prompt"] = get_prompt_from_file(
|
||
file_path=os.path.join(
|
||
cur_dir,
|
||
"built_in_prompt/prompt_reflect_failure.md",
|
||
),
|
||
return_json=False,
|
||
)
|
||
|
||
prompt_dict["reasoning_prompt"] = (
|
||
"## Current Subtask:\n{objective}\n"
|
||
"## Working Plan:\n{plan}\n"
|
||
"{knowledge_gap}\n"
|
||
"## Research Depth:\n{depth}"
|
||
)
|
||
|
||
prompt_dict["previous_plan_inst"] = (
|
||
"## Previous Plan:\n{previous_plan}\n"
|
||
"## Current Subtask:\n{objective}\n"
|
||
)
|
||
|
||
prompt_dict["max_depth_hint"] = (
|
||
"The search depth has reached the maximum limit. So the "
|
||
"current subtask can not be further decomposed and "
|
||
"expanded anymore. I need to find another way to get it "
|
||
"done no matter what."
|
||
)
|
||
|
||
prompt_dict["expansion_inst"] = (
|
||
"Review the web search results and identify whether "
|
||
"there is any information that can potentially help address "
|
||
"checklist items or fulfill knowledge gaps of the task, "
|
||
"but whose content is limited or only briefly mentioned.\n"
|
||
"**Task Description:**\n{objective}\n"
|
||
"**Checklist:**\n{checklist}\n"
|
||
"**Knowledge Gaps:**\n{knowledge_gaps}\n"
|
||
"**Search Results:**\n{search_results}\n"
|
||
"**Output:**\n"
|
||
)
|
||
|
||
prompt_dict["follow_up_judge_sys_prompt"] = (
|
||
"To provide sufficient external information for the user's "
|
||
"query, you have conducted a web search to obtain additional "
|
||
"data. However, you found that some of the information, while "
|
||
"important, was insufficient. Consequently, you extracted the "
|
||
"entire content from one of the URLs to gather more "
|
||
"comprehensive information. Now, you must rigorously and "
|
||
"carefully assess whether, after both the web search and "
|
||
"extraction process, the information content is adequate to "
|
||
"address the given task. Be aware that any arbitrary decisions "
|
||
"may result in unnecessary and unacceptable time costs.\n"
|
||
)
|
||
|
||
prompt_dict[
|
||
"retry_hint"
|
||
] = "Something went wrong when {state}. I need to retry."
|
||
|
||
prompt_dict["need_deeper_hint"] = (
|
||
"The information is insufficient and I need to make deeper "
|
||
"research to fill the knowledge gap."
|
||
)
|
||
|
||
prompt_dict[
|
||
"sufficient_hint"
|
||
] = "The information after web search and extraction is sufficient enough!"
|
||
|
||
prompt_dict["no_result_hint"] = (
|
||
"I mistakenly called the `summarize_intermediate_results` tool as "
|
||
"there exists no milestone result to summarize now."
|
||
)
|
||
|
||
prompt_dict["summarize_hint"] = (
|
||
"Based on your work history above, examine which step in the "
|
||
"following working plan has been completed. Mark the completed "
|
||
"step with [DONE] at the end of its line (e.g., k. step k [DONE]) "
|
||
"and leave the uncompleted steps unchanged. You MUST return only "
|
||
"the updated plan, preserving exactly the same format as the "
|
||
"original plan. Do not include any explanations, reasoning, "
|
||
"or section headers such as '## Working Plan:', just output the"
|
||
"updated plan itself."
|
||
"\n\n## Working Plan:\n{plan}"
|
||
)
|
||
|
||
prompt_dict["summarize_inst"] = (
|
||
"**Task Description:**\n{objective}\n"
|
||
"**Checklist:**\n{knowledge_gaps}\n"
|
||
"**Knowledge Gaps:**\n{working_plan}\n"
|
||
"**Search Results:**\n{tool_result}"
|
||
)
|
||
|
||
prompt_dict["update_report_hint"] = (
|
||
"Due to the overwhelming quantity of information, I have replaced the "
|
||
"original bulk search results from the research phase with the "
|
||
"following report that consolidates and summarizes the essential "
|
||
"findings:\n {intermediate_report}\n\n"
|
||
"Such report has been saved to the {report_path}. "
|
||
"I will now **proceed to the next item** in the working plan."
|
||
)
|
||
|
||
prompt_dict["save_report_hint"] = (
|
||
"The milestone results of the current item in working plan "
|
||
"are summarized into the following report:\n{intermediate_report}"
|
||
)
|
||
|
||
prompt_dict["reflect_instruction"] = (
|
||
"## Work History:\n{conversation_history}\n"
|
||
"## Working Plan:\n{plan}\n"
|
||
)
|
||
|
||
prompt_dict["subtask_complete_hint"] = (
|
||
"Subtask ‘{cur_obj}’ is completed. Now the current subtask "
|
||
"fallbacks to '{next_obj}'"
|
||
)
|
||
|
||
return prompt_dict
|