Optimize DataJuicer Agent doc & linter (#30)

This commit is contained in:
Daoyuan Chen
2025-11-10 18:17:27 +08:00
committed by GitHub
parent 1f0c5de27f
commit dba3b86ddf
14 changed files with 891 additions and 359 deletions

View File

@@ -126,7 +126,7 @@ A powerful multi-agent data processing system that leverages Data-Juicer's 200+
*Alias-Agent* (short for *Alias*) is designed to serve as an intelligent assistant for tackle diverse and complicated real-world tasks, providing three operational modes for flexible task execution: *Alias-Agent* (short for *Alias*) is designed to serve as an intelligent assistant for tackle diverse and complicated real-world tasks, providing three operational modes for flexible task execution:
- **Simple React**: Employs vanilla reasoning-acting loops to iteratively solve problems and execute tool calls. - **Simple React**: Employs vanilla reasoning-acting loops to iteratively solve problems and execute tool calls.
- **Planner-Worker**: Uses intelligent planning to decompose complex tasks into manageable subtasks, with dedicated worker agents handling each subtask independently. - **Planner-Worker**: Uses intelligent planning to decompose complex tasks into manageable subtasks, with dedicated worker agents handling each subtask independently.
- **Built-in Agents**: Leverages specialized agents tailored for specific domains, including *Deep Research Agent* for comprehensive analysis and *Browser-use Agent* for web-based interactions. - **Built-in Agents**: Leverages specialized agents tailored for specific domains, including *Deep Research Agent* for comprehensive analysis and *Browser-use Agent* for web-based interactions.
Beyond being a ready-to-use agent, we envision Alias as a foundational template that can be adapted to different scenarios. Beyond being a ready-to-use agent, we envision Alias as a foundational template that can be adapted to different scenarios.

View File

@@ -1,12 +1,25 @@
# DataJuicer Agent # DataJuicer Agent
A multi-agent data processing system built on [AgentScope](https://github.com/modelscope/agentscope) and [Data-Juicer (DJ)](https://github.com/modelscope/data-juicer). This project demonstrates how to leverage the natural language understanding capabilities of large language models, enabling non-expert users to easily harness the powerful data processing capabilities of Data-Juicer. A multi-agent data processing system built on [AgentScope](https://github.com/agentscope-ai/agentscope) and [Data-Juicer (DJ)](https://github.com/datajuicer/data-juicer). This project demonstrates how to leverage the natural language understanding capabilities of large language models, enabling non-expert users to easily harness the powerful data processing capabilities of Data-Juicer.
## 🎯 Why DataJuicer Agent?
In the actual work of large model R&D and applications, **data processing remains a high-cost, low-efficiency, and hard-to-reproduce process**. Many teams spend more time on data analysis, cleaning and synthesis than on model training, requirement alignment and app development.
We hope to liberate developers from tedious script assembly through agent technology, making data R&D closer to a "think and get" experience.
**Data directly defines the upper limit of model capabilities**. What truly determines model performance are multiple dimensions such as **quality, diversity, harmfulness control, and task matching** of data. Optimizing data is essentially optimizing the model itself. To do this efficiently, we need a systematic toolset.
DataJuicer Agent is designed to support the new paradigm of **data-model co-optimization** as an intelligent collaboration system.
## 📋 Table of Contents ## 📋 Table of Contents
- [🎯 Why DataJuicer Agent?](#-why-datajuicer-agent)
- [📋 Table of Contents](#-table-of-contents) - [📋 Table of Contents](#-table-of-contents)
- [What Does This Agent Do?](#what-does-this-agent-do) - [What Does This Agent Do?](#what-does-this-agent-do)
- [Architecture](#architecture) - [Architecture](#architecture)
- [Multi-Agent Routing Architecture](#multi-agent-routing-architecture)
- [Two Integration Modes](#two-integration-modes)
- [Quick Start](#quick-start) - [Quick Start](#quick-start)
- [System Requirements](#system-requirements) - [System Requirements](#system-requirements)
- [Installation](#installation) - [Installation](#installation)
@@ -23,40 +36,67 @@ A multi-agent data processing system built on [AgentScope](https://github.com/mo
- [MCP Server Types](#mcp-server-types) - [MCP Server Types](#mcp-server-types)
- [Configuration](#configuration-1) - [Configuration](#configuration-1)
- [Usage Methods](#usage-methods) - [Usage Methods](#usage-methods)
- [Feature Preview](#feature-preview) - [Customization and Extension](#customization-and-extension)
- [Custom Prompts](#custom-prompts)
- [Model Replacement](#model-replacement)
- [Extending New Agents](#extending-new-agents)
- [Roadmap](#roadmap)
- [Data-Juicer Q\&A Agent (Demo Available)](#data-juicer-qa-agent-demo-available) - [Data-Juicer Q\&A Agent (Demo Available)](#data-juicer-qa-agent-demo-available)
- [Data Analysis and Visualization Agent (In Development)](#data-analysis-and-visualization-agent-in-development) - [Interactive Data Analysis and Visualization Agent (In Development)](#interactive-data-analysis-and-visualization-agent-in-development)
- [Troubleshooting](#troubleshooting) - [Other Directions](#other-directions)
- [Common Issues](#common-issues) - [Common Issues](#common-issues)
- [Optimization Recommendations](#optimization-recommendations) - [Optimization Recommendations](#optimization-recommendations)
- [Related Resources](#related-resources)
## What Does This Agent Do? ## What Does This Agent Do?
Data-Juicer (DJ) is a one-stop system for text and multimodal data processing for large language models. It provides nearly 200 core data processing operators, covering multimodal data such as text, images, and videos, and supports the full pipeline of data analysis, cleaning, and synthesis. Data-Juicer (DJ) is an **open-source processing system covering the full lifecycle of large model data**, providing four core capabilities:
After running this example, you can: - **Full-Stack Operator Library (DJ-OP)**: Nearly 200 high-performance, reusable multimodal operators covering text, images, and audio/video
- **Intelligent Query**: Find suitable operators from nearly 200 data processing operators for your data scenarios - **High-Performance Engine (DJ-Core)**: Built on Ray, supporting TB-level data, 10K-core distributed computing, with operator fusion and multi-granularity fault tolerance
- **Automated Pipeline**: Describe your data processing needs, automatically generate Data-Juicer YAML configurations and execute them - **Collaborative Development Platform (DJ-Sandbox)**: Introduces A/B Test and Scaling Law concepts, using small-scale experiments to drive large-scale optimization
- **Custom Extension**: Quickly develop custom operators for specific scenarios - **Natural Language Interaction Layer (DJ-Agents)**: Enables developers to build data pipelines through conversational interfaces using Agent technology
DataJuicer Agent is not a simple Q&A bot, but an **intelligent collaborator for data processing**. Specifically, it can:
- **Intelligent Query**: Automatically match the most suitable operators based on natural language descriptions (precisely locating from nearly 200 operators)
- **Automated Pipeline**: Describe data processing needs, automatically generate Data-Juicer YAML configurations and execute them
- **Custom Extension**: Help users develop custom operators and seamlessly integrate them into local environments
**Our goal: Let developers focus on "what to do" rather than "how to do it"**.
## Architecture ## Architecture
### Multi-Agent Routing Architecture
DataJuicer Agent adopts a **multi-agent routing architecture**, which is key to system scalability. When a user inputs a natural language request, the **Router Agent** first performs task triage to determine whether it's a standard data processing task or a custom requirement that needs new capabilities.
``` ```
User Query User Query
Router Agent ──┐ Router Agent (Task Triage)
├── Data Processing Agent (DJ Agent) ├── Standard Data Processing Task → Data Processing Agent (DJ Agent)
| ├── General File Read/Write Tools │ ├── Preview data samples (confirm field names and data format)
│ ├── query_dj_operators (Query DataJuicer operators) │ ├── query_dj_operators (semantic matching of operators)
── execute_safe_command (Execute safe commands including dj-process, dj-analyze) ── Generate YAML configuration file
└── execute_safe_command (execute dj-process, dj-analyze)
└── Code Development Agent (DJ Dev Agent)
├── General File Read/Write Tools └── Custom Operator Development → Code Development Agent (DJ Dev Agent)
├── get_basic_files (Get basic development knowledge) ├── get_basic_files (get base classes and registration mechanism)
├── get_operator_example (Get operator source code examples related to requirements) ├── get_operator_example (get similar operator examples)
└── configure_data_juicer_path (Configure DataJuicer path) ├── Generate code compliant with specifications
└── Local integration (register to user-specified path)
``` ```
### Two Integration Modes
Agent integration with DataJuicer has two modes to adapt to different usage scenarios:
- **Tool Binding Mode**: Agent calls DataJuicer command-line tools (such as `dj-analyze`, `dj-process`), compatible with existing user habits, low migration cost
- **MCP Binding Mode**: Agent directly calls DataJuicer's MCP (Model Context Protocol) interface, no need to generate intermediate YAML files, directly run operators or data recipes, better performance
These two modes are automatically selected by the Agent based on task complexity and performance requirements, ensuring both flexibility and efficiency.
## Quick Start ## Quick Start
### System Requirements ### System Requirements
@@ -101,19 +141,45 @@ export DATA_JUICER_PATH="your-data-juicer-path"
Choose the running mode using the `-u` or `--use_studio` parameter: Choose the running mode using the `-u` or `--use_studio` parameter:
```bash ```bash
# Use AgentScope Studio (provides interactive interface) # Use AgentScope Studio's interactive interface (please install and start AgentScope Studio first)
python main.py --use_studio True python main.py --use_studio True
# Or use command-line mode (default) # Or use command line mode directly (default)
python main.py python main.py
``` ```
Note:
Install AgentScope Studio via npm:
```bash
npm install -g @agentscope/studio
```
Start Studio with the following command:
```bash
as_studio
```
## Agent Introduction ## Agent Introduction
### Data Processing Agent ### Data Processing Agent
Responsible for interacting with Data-Juicer and executing actual data processing tasks. Supports automatic operator recommendation from natural language descriptions, configuration generation, and execution. Responsible for interacting with Data-Juicer and executing actual data processing tasks. Supports automatic operator recommendation from natural language descriptions, configuration generation, and execution.
**Workflow:**
When a user says: "My data is saved in xxx, please clean entries with text length less than 5 and image size less than 10MB", the Agent doesn't blindly execute, but proceeds step by step:
1. **Data Preview**: Preview the first 510 data samples to confirm field names and data format—this is a crucial step to avoid configuration errors
2. **Operator Retrieval**: Call the `query_dj_operators` tool to semantically match suitable operators
3. **Parameter Decision**: LLM autonomously decides global parameters (such as dataset_path, export_path) and specific operator configurations
4. **Configuration Generation**: Generate standard YAML configuration files
5. **Execute Processing**: Call the `dj-process` command to execute actual processing
The entire process is both automated and explainable. Users can intervene at any stage to ensure results meet expectations.
**Typical Use Cases:** **Typical Use Cases:**
- **Data Cleaning**: Deduplication, removal of low-quality samples, format standardization - **Data Cleaning**: Deduplication, removal of low-quality samples, format standardization
- **Multimodal Processing**: Process text, image, and video data simultaneously - **Multimodal Processing**: Process text, image, and video data simultaneously
@@ -124,9 +190,42 @@ Responsible for interacting with Data-Juicer and executing actual data processin
<img src="assets/dj_agent_image.png" width="100%"> <img src="assets/dj_agent_image.png" width="100%">
</details> </details>
**Example Execution Flow:**
User input: "The data in ./data/demo-dataset-images.jsonl, remove samples with text field length less than 5 and image size less than 100Kb..."
Agent execution steps:
1. Call `query_dj_operators`, accurately returning two operators: `text_length_filter` and `image_size_filter`
2. Use `view_text_file` tool to preview raw data, confirming fields are indeed 'text' and 'image'
3. Generate YAML configuration and save to temporary path via `write_text_file`
4. Call `execute_safe_command` to execute `dj-process`, returning result path
The entire process requires no manual intervention, but every step is traceable and verifiable. **This is exactly the "automated but not out of control" data processing experience we pursue**.
### Code Development Agent (DJ Dev Agent) ### Code Development Agent (DJ Dev Agent)
Assists in developing custom data processing operators, powered by the `qwen3-coder-480b-a35b-instruct` model by default. When built-in operators cannot meet requirements, the traditional approach is: check documentation, copy code, adjust parameters, write tests—this process can take hours.
The goal of Operator Development Agent is to compress this process to minutes while ensuring code quality. Powered by the `qwen3-coder-480b-a35b-instruct` model by default.
**Workflow:**
When a user requests: "Help me create an operator that reverses word order and generate unit test files", the Router routes it to DJ Dev Agent.
The Agent's execution process consists of four steps:
1. **Operator Retrieval**: Find existing operators with similar functionality as references
2. **Get Templates**: Pull base class files and typical examples to ensure consistent code style
3. **Generate Code**: Based on the function prototype provided by the user, generate operator classes compliant with DataJuicer specifications
4. **Local Integration**: Register the new operator to the user-specified local codebase path
The entire process transforms vague requirements into runnable, testable, and reusable modules.
**Generated Content:**
- **Implement Operator**: Create operator class file, inherit from Mapper/Filter base class, register using `@OPERATORS.register_module` decorator
- **Update Registration**: Modify `__init__.py`, add new class to `__all__` list
- **Write Tests**: Generate unit tests covering multiple scenarios, including edge cases, ensuring robustness
**Typical Use Cases:** **Typical Use Cases:**
- **Develop domain-specific filter or transformation operators** - **Develop domain-specific filter or transformation operators**
@@ -142,20 +241,21 @@ Assists in developing custom data processing operators, powered by the `qwen3-co
### Operator Retrieval ### Operator Retrieval
DJ Agent implements an intelligent operator retrieval tool that quickly finds the most relevant operators from Data-Juicer's nearly 200 operators through an independent LLM query process. This is a key component enabling the data processing agent and code development agent to run accurately. Operator retrieval is the core of whether the Agent can work accurately. DJ Agent implements an intelligent operator retrieval tool that quickly finds the most relevant operators from Data-Juicer's nearly 200 operators through an independent LLM query process. This is a key component enabling the data processing agent and code development agent to run accurately.
We provide three retrieval modes to choose from based on different scenarios: We don't use a single solution, but provide three modes that can be flexibly selected via the `-r` parameter:
#### Retrieval Modes #### Retrieval Modes
**LLM Retrieval (default)** **LLM Retrieval (default)**
- Uses the Qwen-Turbo model to match the most relevant operators - Uses Qwen-Turbo to understand user requirements from a semantic level, suitable for complex and vague descriptions
- Provides detailed matching reasons and relevance scores - Provides detailed matching reasons and relevance scores
- Suitable for scenarios requiring high-precision matching, but consumes more tokens - Higher token consumption, but highest matching accuracy
**Vector Retrieval (vector)** **Vector Retrieval (vector)**
- Based on DashScope text embedding and FAISS similarity search - Based on DashScope text embedding + FAISS similarity search
- Fast and efficient, suitable for large-scale retrieval scenarios - Fast, suitable for batch tasks or rapid prototyping
- No need to call LLM, lower cost
**Auto Mode (auto)** **Auto Mode (auto)**
- Prioritizes LLM retrieval, automatically falls back to vector retrieval on failure - Prioritizes LLM retrieval, automatically falls back to vector retrieval on failure
@@ -172,22 +272,25 @@ For more parameter descriptions, see `python main.py --help`
### MCP Agent ### MCP Agent
Data-Juicer provides MCP (Model Context Protocol) services that can directly obtain operator information and execute data processing through native interfaces, making it easy to migrate and integrate without separate LLM queries and command-line calls. In addition to command-line tools, DataJuicer also natively supports MCP services, which is an important means to improve performance. MCP services can directly obtain operator information and execute data processing through native interfaces, making it easy to migrate and integrate without separate LLM queries and command-line calls.
#### MCP Server Types #### MCP Server Types
Data-Juicer provides two MCP server modes: Data-Juicer provides two types of MCP:
**Recipe-Flow (Data Recipe)** **Recipe-Flow MCP (Data Recipe)**
- Filter by operator type and tags - Provides two tools: `get_data_processing_ops` and `run_data_recipe`
- Support combining multiple operators into data recipes for execution - Retrieves by operator type, applicable modalities, and other tags, **no need to call LLM or vector models**
- Suitable for standardized, high-frequency scenarios with better performance
**Granular-Operators (Fine-grained Operators)**
- Provide each operator as an independent tool
- Flexibly specify operator lists through environment variables
- Build fully customized data processing pipelines
For detailed information, please refer to: [Data-Juicer MCP Service Documentation](https://modelscope.github.io/data-juicer/en/main/docs/DJ_service.html#mcp-server) **Granular-Operators MCP (Fine-grained Operators)**
- Wraps each built-in operator as an independent tool, runs on call
- Returns all operators by default, but can control visible scope through environment variables
- Suitable for fine-grained control, building fully customized data processing pipelines
This means that in some scenarios, the Agent's call path can be *shorter, faster, and more direct* than manually writing YAML.
For detailed information, please refer to: [Data-Juicer MCP Service Documentation](https://datajuicer.github.io/data-juicer/en/main/docs/DJ_service.html#mcp-server)
> **Note**: The Data-Juicer MCP server is currently in early development, and features and tools may change with ongoing development. > **Note**: The Data-Juicer MCP server is currently in early development, and features and tools may change with ongoing development.
@@ -211,13 +314,37 @@ Enable MCP Agent to replace DJ Agent:
```bash ```bash
# Enable MCP Agent and Dev Agent # Enable MCP Agent and Dev Agent
python main.py --available_agents [dj_mcp, dj_dev] python main.py --available_agents [dj_mcp,dj_dev]
# Or use shorthand # Or use shorthand
python main.py -a [dj_mcp, dj_dev] python main.py -a [dj_mcp,dj_dev]
``` ```
## Feature Preview ## Customization and Extension
### Custom Prompts
All Agent system prompts are defined in the `prompts.py` file.
### Model Replacement
You can specify different models for different Agents in `main.py`. For example:
- Main Agent uses `qwen-max` for complex reasoning
- Development Agent uses `qwen3-coder-480b-a35b-instruct` to optimize code generation quality
At the same time, Formatter and Memory can also be replaced. This design allows the system to be both out-of-the-box and adaptable to enterprise-level requirements.
### Extending New Agents
DataJuicer Agent is an open framework. The core is the `agents2toolkit` function—it can automatically wrap any Agent as a tool callable by the Router.
Simply add your Agent instance to the `agents` list, and the Router will dynamically generate corresponding tools at runtime and automatically route based on task semantics.
This means you can quickly build domain-specific data agents based on this framework.
*Extensibility is an important design principle*.
## Roadmap
The Data-Juicer agent ecosystem is rapidly expanding. Here are the new agents currently in development or planned: The Data-Juicer agent ecosystem is rapidly expanding. Here are the new agents currently in development or planned:
@@ -230,11 +357,24 @@ Provides users with detailed answers about Data-Juicer operators, concepts, and
Your browser does not support the video tag. Your browser does not support the video tag.
</video> </video>
### Data Analysis and Visualization Agent (In Development) ### Interactive Data Analysis and Visualization Agent (In Development)
Generates data analysis and visualization results, expected to be released soon. We are building a more advanced **human-machine collaborative data optimization workflow** that introduces human feedback:
- Users can view statistics, attribution analysis, and visualization results
- Dynamically edit recipes, approve or reject suggestions
- Underpinned by `dj.analyzer` (data analysis), `dj.attributor` (effect attribution), and `dj.sandbox` (experiment management)
- Supports closed-loop optimization based on validation tasks
## Troubleshooting ### Other Directions
- **Data Processing Agent Benchmarking**: Quantify the performance of different Agents in terms of accuracy, efficiency, and robustness
- **Data "Health Check Report" & Data Intelligent Recommendation**: Automatically diagnose data problems and recommend optimization solutions
- **Router Agent Enhancement**: More seamless, e.g., when operators are lacking → Code Development Agent → Data Processing Agent
- **MCP Further Optimization**: Embedded LLM, users can directly use MCP connected to their local environment (e.g., IDE) to get an experience similar to current data processing agents
- **Knowledge Base and RAG-oriented Data Agents**
- **Better Automatic Processing Solution Generation**: Less token usage, more efficient, higher quality processing results
- **Data Workflow Template Reuse and Automatic Tuning**: Based on DataJuicer community data recipes
- ......
### Common Issues ### Common Issues
@@ -250,12 +390,25 @@ A: Ensure Data-Juicer path is configured correctly and check the example code pr
**Q: What to do if MCP service connection fails?** **Q: What to do if MCP service connection fails?**
A: Check if the MCP server is running and confirm the URL address in the configuration file is correct. A: Check if the MCP server is running and confirm the URL address in the configuration file is correct.
**Q: Error: requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: http://localhost:3000/trpc/pushMessage**
A: Please check if AgentScope Studio has been successfully started. Try installing AgentScope Studio first with `npm install -g @agentscope/studio`, then start it with `as_studio`.
### Optimization Recommendations ### Optimization Recommendations
- For large-scale data processing, it is recommended to use DataJuicer's distributed mode - For large-scale data processing, it is recommended to use DataJuicer's distributed mode
- Set batch size appropriately to balance memory usage and processing speed - Set batch size appropriately to balance memory usage and processing speed
- For more advanced data processing features (synthesis, Data-Model Co-Development), please refer to DataJuicer [documentation](https://modelscope.github.io/data-juicer/en/main/index.html) - For more advanced data processing features (synthesis, Data-Model Co-Development), please refer to DataJuicer [documentation](https://datajuicer.github.io/data-juicer/en/main/index.html)
--- ---
**Contributing**: Welcome to submit Issues and Pull Requests to improve AgentScope, DataJuicer Agent, and [DataJuicer](https://modelscope.github.io/data-juicer/en/main/index.html#contribution-and-acknowledgements). If you encounter problems during use or have feature suggestions, please feel free to contact us. ## Related Resources
- DataJuicer has been used by a large number of Tongyi and Alibaba Cloud internal and external users, and has facilitated many research works. All code is continuously maintained and enhanced.
*Welcome to visit GitHub, Star, Fork, submit Issues, and join the community!*
- **Project Repositories**:
- [AgentScope](https://github.com/agentscope-ai/agentscope)
- [DataJuicer](https://github.com/datajuicer/data-juicer)
**Contributing**: Welcome to submit Issues and Pull Requests to improve AgentScope, DataJuicer Agent, and DataJuicer. If you encounter problems during use or have feature suggestions, please feel free to contact us.

View File

@@ -1,12 +1,25 @@
# DataJuicer 智能体 # DataJuicer 智能体
基于 [AgentScope](https://github.com/modelscope/agentscope) 和 [Data-Juicer (DJ)](https://github.com/modelscope/data-juicer) 构建的数据处理多智能体系统。该项目展示了如何利用大模型的自然语言理解能力,让非专家用户也能轻松使用 Data-Juicer 的强大数据处理能力。 基于 [AgentScope](https://github.com/agentscope-ai/agentscope) 和 [Data-Juicer (DJ)](https://github.com/datajuicer/data-juicer) 构建的数据处理多智能体系统。该项目展示了如何利用大模型的自然语言理解能力,让非专家用户也能轻松使用 Data-Juicer 的强大数据处理能力。
## 🎯 为什么需要 DataJuicer Agent
在大模型研发和应用的实际工作中,**数据处理仍然是一个高成本、低效率、难复现的环节**。很多团队花在数据分析、清洗、合成等阶段的时间,往往超过模型训练、需求对齐、应用功能开发。
我们希望通过智能体技术,把开发者从繁琐的脚本拼凑中解放出来,让数据研发更接近"所想即所得"的体验。
**数据直接定义了模型能力的上限**。真正决定模型表现的,是数据的**质量、多样性、有害性控制、任务匹配度**等多个维度。优化数据,本质上就是在优化模型本身。而要高效地做这件事,我们需要一套系统化的工具。
DataJuicer Agent 正是为支撑**数据与模型协同优化**这一新范式而设计的智能协作系统。
## 📋 目录 ## 📋 目录
- [🎯 为什么需要 DataJuicer Agent](#-为什么需要-datajuicer-agent)
- [📋 目录](#-目录) - [📋 目录](#-目录)
- [这个智能体做了什么?](#这个智能体做了什么) - [这个智能体做了什么?](#这个智能体做了什么)
- [架构](#架构) - [架构](#架构)
- [多智能体路由架构](#多智能体路由架构)
- [两种集成方式](#两种集成方式)
- [快速开始](#快速开始) - [快速开始](#快速开始)
- [系统要求](#系统要求) - [系统要求](#系统要求)
- [安装](#安装) - [安装](#安装)
@@ -23,39 +36,67 @@
- [MCP 服务器类型](#mcp-服务器类型) - [MCP 服务器类型](#mcp-服务器类型)
- [配置](#配置-1) - [配置](#配置-1)
- [使用方法](#使用方法) - [使用方法](#使用方法)
- [功能预览](#功能预览) - [定制化与扩展](#定制化与扩展)
- [自定义 Prompts](#自定义-prompts)
- [更换模型](#更换模型)
- [扩展新智能体](#扩展新智能体)
- [Roadmap](#roadmap)
- [Data-Juicer 问答智能体 (演示可用)](#data-juicer-问答智能体-演示可用) - [Data-Juicer 问答智能体 (演示可用)](#data-juicer-问答智能体-演示可用)
- [数据分析与可视化智能体 (开发中)](#数据分析与可视化智能体-开发中) - [交互式数据分析与可视化智能体 (开发中)](#交互式数据分析与可视化智能体-开发中)
- [其它方向](#其它方向)
- [常见问题](#常见问题) - [常见问题](#常见问题)
- [优化建议](#优化建议) - [优化建议](#优化建议)
- [相关资源](#相关资源)
## 这个智能体做了什么? ## 这个智能体做了什么?
Data-Juicer (DJ) 是一个一站式系统面向大模型的文本及多模态数据处理。它提供了近200个核心数据处理算子覆盖文本、图像、视频等多模态数据支持数据分析、清洗、合成等全流程。 Data-Juicer (DJ) 是一个**覆盖大模型数据全生命周期的开源处理系统**,提供四个核心能力:
运行本示例后,您可以: - **全栈算子库DJ-OP**:近 200 个高性能、可复用的多模态算子,覆盖文本、图像、音视频
- **智能查询**从近200个数据处理算子中找到适合您数据场景的算子 - **高性能引擎DJ-Core**:基于 Ray 构建,支持 TB 级数据、万核分布式计算,具备算子融合与多粒度容错
- **协同开发平台DJ-Sandbox**:引入 A/B Test 与 Scaling Law 思想,用小规模实验驱动大规模优化
- **自然语言交互层DJ-Agents**:通过 Agent 技术,让开发者用对话方式构建数据流水线
DataJuicer Agent 不是一个简单的问答机器人,而是一个**数据处理的智能协作者**。具体来说,它能:
- **智能查询**根据自然语言描述自动匹配最合适的算子从近200个算子中精准定位
- **自动化流程**:描述数据处理需求,自动生成 Data-Juicer YAML 配置并执行 - **自动化流程**:描述数据处理需求,自动生成 Data-Juicer YAML 配置并执行
- **自定义扩展**为特定场景快速开发自定义算子 - **自定义扩展**帮助用户开发自定义算子,无缝集成到本地环境
**我们的目标是:让开发者专注于"做什么",而不是"怎么做"**
## 架构 ## 架构
### 多智能体路由架构
DataJuicer Agent 采用**多智能体路由架构**,这是系统可扩展性的关键。当用户输入一个自然语言请求,首先由 **Router Agent** 进行任务分诊,判断这是标准的数据处理任务,还是需要开发新能力的定制需求。
``` ```
用户查询 用户查询
路由智能体 ──┐ Router Agent (任务分诊)
├── 数据处理智能体 (DJ 智能体) ├── 标准数据处理任务 → Data Processing Agent (DJ Agent)
| ├── 通用文件读写工具 │ ├── 预览数据样本(确认字段名和数据格式)
│ ├── query_dj_operators (查询DataJuicer算子) │ ├── query_dj_operators (基于语义匹配算子)
── execute_safe_command (执行包含dj-process, dj-analyze在内的安全命令) ── 生成 YAML 配置文件
└── execute_safe_command (执行 dj-process, dj-analyze)
└── 代码开发智能体 (DJ Dev 智能体)
├── 通用文件读写工具 └── 自定义算子开发 → Code Development Agent (DJ Dev Agent)
├── get_basic_files (获取基础的开发知识) ├── get_basic_files (获取基类和注册机制)
├── get_operator_example (获取与需求相关的算子源码示例) ├── get_operator_example (获取相似算子示例)
└── configure_data_juicer_path (配置DataJuicer路径) ├── 生成符合规范的算子代码
└── 本地集成(注册到用户指定路径)
``` ```
### 两种集成方式
Agent 与 DataJuicer 的集成有两种方式,以适应不同使用场景:
- **绑定工具模式**Agent 调用 DataJuicer 的命令行工具(如 `dj-analyze``dj-process`),兼容现有用户习惯,迁移成本低
- **绑定 MCP 模式**Agent 直接调用 DataJuicer 的 MCPModel Context Protocol接口无需生成中间 YAML 文件,直接运行算子或数据菜谱,性能更优
这两种方式由 Agent 根据任务复杂度和性能需求自动选择,确保灵活性与效率兼得。
## 快速开始 ## 快速开始
### 系统要求 ### 系统要求
@@ -100,19 +141,45 @@ export DATA_JUICER_PATH="your-data-juicer-path"
通过 `-u``--use_studio` 参数选择运行方式: 通过 `-u``--use_studio` 参数选择运行方式:
```bash ```bash
# 使用 AgentScope Studio(提供交互式界面) # 使用 AgentScope Studio交互式界面(请先安装并启动 AgentScope Studio
python main.py --use_studio True python main.py --use_studio True
# 或使用命令行模式(默认) # 或直接使用命令行模式(默认)
python main.py python main.py
``` ```
注:
AgentScope Studio 通过 npm 安装:
```bash
npm install -g @agentscope/studio
```
使用以下命令启动 Studio
```bash
as_studio
```
## 智能体介绍 ## 智能体介绍
### 数据处理智能体 ### 数据处理智能体
负责与 Data-Juicer 交互,执行实际的数据处理任务。支持从自然语言描述自动推荐算子、生成配置并执行。 负责与 Data-Juicer 交互,执行实际的数据处理任务。支持从自然语言描述自动推荐算子、生成配置并执行。
**工作流程:**
当用户说:"我的数据保存在 xxx请清理其中文本长度小于5、图片大小小于10MB的条目"Agent 并不会盲目执行,而是按步骤推进:
1. **数据预览**:预览前 510 个数据样本,确认字段名和数据格式——这是避免配置错误的关键一步
2. **算子检索**:调用 `query_dj_operators` 工具,基于语义匹配合适的算子
3. **参数决策**LLM 自主决定全局参数(如 dataset_path、export_path和算子具体配置
4. **配置生成**:生成标准的 YAML 配置文件
5. **执行处理**:调用 `dj-process` 命令执行实际处理
整个过程既自动化,又具备可解释性。用户可以在任何环节介入干预,确保结果符合预期。
**典型用途:** **典型用途:**
- **数据清洗**:去重、移除低质量样本、格式标准化 - **数据清洗**:去重、移除低质量样本、格式标准化
- **多模态处理**:同时处理文本、图像、视频数据 - **多模态处理**:同时处理文本、图像、视频数据
@@ -123,9 +190,42 @@ python main.py
<img src="assets/dj_agent_image.png" width="100%"> <img src="assets/dj_agent_image.png" width="100%">
</details> </details>
**示例执行流程:**
用户输入:"The data in ./data/demo-dataset-images.jsonl, remove samples with text field length less than 5 and image size less than 100Kb..."
Agent 执行步骤:
1. 调用 `query_dj_operators`,精准返回两个算子:`text_length_filter``image_size_filter`
2.`view_text_file` 工具预览原始数据,确认字段确实是 'text' 和 'image'
3. 生成 YAML 配置,并通过 `write_text_file` 保存到临时路径
4. 调用 `execute_safe_command` 执行 `dj-process`,返回结果路径
整个过程没有人工干预,但每一步都可追溯、可验证。**这正是我们追求的"自动化但不失控"的数据处理体验**。
### 代码开发智能体 ### 代码开发智能体
辅助开发自定义数据处理算子,默认使用 `qwen3-coder-480b-a35b-instruct` 模型驱动 当内置算子无法满足需求时,传统做法是:查文档、抄代码、调参数、写测试——整个过程可能耗时数小时
Operator Development Agent 的目标,是将这个过程压缩到几分钟,并保证代码质量。默认使用 `qwen3-coder-480b-a35b-instruct` 模型驱动。
**工作流程:**
当用户提出:"帮我创建一个将单词倒序排列的算子,并生成单元测试文件"Router 会将其路由至 DJ Dev Agent。
该 Agent 的执行流程分为四步:
1. **算子检索**:查找功能相似的现有算子作为参考
2. **获取模板**:拉取基类文件和典型示例,确保代码风格一致
3. **生成代码**:基于用户提供的函数原型,生成符合 DataJuicer 规范的算子类
4. **本地集成**:将新算子注册到用户指定的本地代码库路径
整个过程将模糊需求转化为可运行、可测试、可复用的模块。
**生成内容:**
- **实现算子**:创建算子类文件,继承 Mapper/Filter 基类,使用 `@OPERATORS.register_module` 装饰器注册
- **更新注册**:修改 `__init__.py`,将新类加入 `__all__` 列表
- **编写测试**:生成覆盖多种场景的单元测试,包括边缘 case确保鲁棒性
**典型用途:** **典型用途:**
- **开发领域特定的过滤或转换算子** - **开发领域特定的过滤或转换算子**
@@ -141,20 +241,21 @@ python main.py
### 算子检索 ### 算子检索
DJ 智能体实现了一个智能算子检索工具,通过独立的 LLM 查询环节从 Data-Juicer 的近200个算子中快速找到最相关的算子。这是数据处理智能体和代码开发智能体能够准确运行的关键组件。 算子检索是 Agent 能否精准工作的核心。DJ 智能体实现了一个智能算子检索工具,通过独立的 LLM 查询环节从 Data-Juicer 的近200个算子中快速找到最相关的算子。这是数据处理智能体和代码开发智能体能够准确运行的关键组件。
我们提供了三种检索模式,可根据不同场景选用 我们没有采用单一方案,而是提供了三种模式,通过 `-r` 参数灵活选择
#### 检索模式 #### 检索模式
**LLM 检索 (默认)** **LLM 检索 (默认)**
- 使用 Qwen-Turbo 模型匹配最相关算子 - 使用 Qwen-Turbo 从语义层面理解用户需求,适合复杂、模糊的描述
- 提供详细的匹配理由和相关性评分 - 提供详细的匹配理由和相关性评分
- 适合需要高精度匹配的场景,但消耗更多 Token - Token 消耗较高,但匹配精度最高
**向量检索 (vector)** **向量检索 (vector)**
- 基于 DashScope 文本嵌入 FAISS 相似度搜索 - 基于 DashScope 文本嵌入 + FAISS 相似度搜索
- 快速且高效,适合大规模检索场景 - 速度快,适合批量任务或快速原型
- 无需调用 LLM成本更低
**自动模式 (auto)** **自动模式 (auto)**
- 优先尝试 LLM 检索,失败时自动降级到向量检索 - 优先尝试 LLM 检索,失败时自动降级到向量检索
@@ -171,22 +272,25 @@ python main.py --retrieve_mode vector
### MCP 智能体 ### MCP 智能体
Data-Juicer 提供了 MCP (Model Context Protocol) 服务可直接通过原生接口获取算子信息、执行数据处理,易于迁移和集成,无需单独的 LLM 查询和命令行调用。 除了命令行,DataJuicer 还原生支持 MCP 服务这是提升性能的重要手段。MCP 服务可直接通过原生接口获取算子信息、执行数据处理,易于迁移和集成,无需单独的 LLM 查询和命令行调用。
#### MCP 服务器类型 #### MCP 服务器类型
Data-Juicer 提供两 MCP 服务器模式 Data-Juicer 提供两 MCP
**Recipe-Flow数据菜谱** **Recipe-Flow MCP(数据菜谱)**
- 根据算子类型和标签进行筛选 - 提供 `get_data_processing_ops``run_data_recipe` 两个工具
- 支持将多个算子组合成数据菜谱运行 - 通过算子类型、适用模态等标签进行检索,**无需调用 LLM 或向量模型**
- 适合标准化、高频场景,性能更优
**Granular-Operators细粒度算子**
- 将每个算子作为独立工具提供
- 通过环境变量灵活指定算子列表
- 构建完全定制化的数据处理管道
详细信息请参考:[Data-Juicer MCP 服务文档](https://modelscope.github.io/data-juicer/en/main/docs/DJ_service.html#mcp-server) **Granular-Operators MCP细粒度算子**
- 将每个内置算子包装为独立工具,调用即运行
- 默认返回所有算子,但可通过环境变量控制可见范围
- 适合精细化控制,构建完全定制化的数据处理管道
这意味着在某些场景下Agent 的调用路径可以比手动写 YAML *更短、更快、更直接*
详细信息请参考:[Data-Juicer MCP 服务文档](https://datajuicer.github.io/data-juicer/en/main/docs/DJ_service.html#mcp-server)
> **注意**Data-Juicer MCP 服务器目前处于早期开发阶段,功能和工具可能会随着持续开发而变化。 > **注意**Data-Juicer MCP 服务器目前处于早期开发阶段,功能和工具可能会随着持续开发而变化。
@@ -210,14 +314,38 @@ Data-Juicer 提供两种 MCP 服务器模式:
```bash ```bash
# 启用 MCP 智能体和开发智能体 # 启用 MCP 智能体和开发智能体
python main.py --available_agents [dj_mcp, dj_dev] python main.py --available_agents [dj_mcp,dj_dev]
# 或使用简写 # 或使用简写
python main.py -a [dj_mcp, dj_dev] python main.py -a [dj_mcp,dj_dev]
``` ```
## 功能预览 ## 定制化与扩展
### 自定义 Prompts
所有 Agent 的系统提示词都定义在 `prompts.py` 文件中。
### 更换模型
你可以在 `main.py` 中为不同 Agent 指定不同模型。例如:
- 主 Agent 使用 `qwen-max` 处理复杂推理
- 开发 Agent 使用 `qwen3-coder-480b-a35b-instruct` 优化代码生成质量
同时Formatter 和 Memory 也可替换。这种设计让系统既能开箱即用,又能适配企业级需求。
### 扩展新智能体
DataJuicer Agent 是一个开放框架。核心在于 `agents2toolkit` 函数——它能将任意 Agent 自动包装为 Router 可调用的工具。
只需将你的 Agent 实例加入 `agents` 列表Router 就会在运行时动态生成对应工具,并根据任务语义自动路由。
这意味着,你可以基于此框架,快速构建领域专属的数据智能体。
*扩展性,是我们设计的重要原则*
## Roadmap
Data-Juicer 智能体生态系统正在快速扩展,以下是当前正在开发或计划中的新智能体: Data-Juicer 智能体生态系统正在快速扩展,以下是当前正在开发或计划中的新智能体:
@@ -230,9 +358,24 @@ Data-Juicer 智能体生态系统正在快速扩展,以下是当前正在开
您的浏览器不支持视频标签。 您的浏览器不支持视频标签。
</video> </video>
### 数据分析与可视化智能体 (开发中) ### 交互式数据分析与可视化智能体 (开发中)
生成数据分析和可视化结果,预计近期发布。 我们正在构建更高级的**人机协同数据优化工作流**,引入人类反馈:
- 用户可查看统计、归因分析以及可视化结果
- 动态编辑菜谱,批准或拒绝建议
- 底层由 `dj.analyzer`(数据分析)、`dj.attributor`(效果归因)、`dj.sandbox`(实验管理)共同支撑
- 支持基于验证任务的闭环优化
### 其它方向
- **数据处理智能体 Benchmarking**:量化不同 Agent 在准确性、效率、鲁棒性上的表现
- **数据"体检报告" & 数据智能推荐**:自动诊断数据问题并推荐优化方案
- **Router Agent 增强**更无感丝滑譬如当缺乏算子时→代码开发Agent→数据处理agent
- **MCP 进一步优化**:内嵌 LLM用户可直接使用 MCP 链接自己本地环境如IDE获得目前数据处理 agent 类似的体验
- **面向知识库、RAG 的数据智能体**
- **更好的处理方案自动生成**:更少 token 用量,更高效,更优质处理结果
- **数据工作流模版复用及自动调优**:基于 DataJuicer 社区数据菜谱
- ......
### 常见问题 ### 常见问题
@@ -248,13 +391,23 @@ A: 确保 Data-Juicer 路径配置正确,并查看代码开发智能体提供
**Q: MCP 服务连接失败怎么办?** **Q: MCP 服务连接失败怎么办?**
A: 检查 MCP 服务器是否正在运行,确认配置文件中的 URL 地址正确。 A: 检查 MCP 服务器是否正在运行,确认配置文件中的 URL 地址正确。
**Q: 报错requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: http://localhost: 3000/trpc/pushMessage**
A: 请检查是否agentscope studio已经成功拉起。尝试先`npm install -g @agentscope/studio`下载agentscope studio然后`as_studio`启动。
### 优化建议 ### 优化建议
- 对于大规模数据处理建议使用DataJuicer提供的分布式模式 - 对于大规模数据处理建议使用DataJuicer提供的分布式模式
- 合理设置批处理大小以平衡内存使用和处理速度 - 合理设置批处理大小以平衡内存使用和处理速度
- 更多进阶数据处理合成、Data-Model Co-Development等特性能力请参考DataJuicer[文档页](https://modelscope.github.io/data-juicer/zh_CN/main/index_ZH) - 更多进阶数据处理合成、Data-Model Co-Development等特性能力请参考DataJuicer[文档页](https://datajuicer.github.io/data-juicer/zh_CN/main/index_ZH)
--- ---
**贡献指南**:欢迎提交 Issue 和 Pull Request 来改进agentscope、DataJuicer Agent及[DataJuicer](https://modelscope.github.io/data-juicer/zh_CN/main/index_ZH#id4)。如果您在使用过程中遇到问题或有功能建议,请随时联系我们。 ## 相关资源
- DataJuicer 已经被用于大量通义和阿里云内外部用户,背后也衍生了多项研究。所有代码持续维护增强中。
*欢迎访问 GitHubStar、Fork、提 Issue以及加入社区共建*
- **项目地址**
- [AgentScope](https://github.com/agentscope-ai/agentscope)
- [DataJuicer](https://github.com/datajuicer/data-juicer)
**贡献指南**:欢迎提交 Issue 和 Pull Request 来改进 agentscope、DataJuicer Agent 及 DataJuicer。如果您在使用过程中遇到问题或有功能建议请随时联系我们。

View File

@@ -2,7 +2,8 @@
""" """
Agent Factory Agent Factory
Factory functions for creating and configuring agents with standardized toolkits. Factory functions for creating and configuring agents
with standardized toolkits.
""" """
import os import os
@@ -86,7 +87,7 @@ def create_agent(
parallel_tool_calls=parallel_tool_calls, parallel_tool_calls=parallel_tool_calls,
**kwargs, **kwargs,
) )
agent.__doc__ = description agent.__doc__ = description
return agent return agent

View File

@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import os import os
import fire
from typing import List from typing import List
import fire
from agentscope.model import DashScopeChatModel from agentscope.model import DashScopeChatModel
from agentscope.formatter import DashScopeChatFormatter from agentscope.formatter import DashScopeChatFormatter
@@ -9,7 +9,12 @@ from agentscope.memory import InMemoryMemory
from agentscope.agent import UserAgent from agentscope.agent import UserAgent
from agent_factory import create_agent from agent_factory import create_agent
from prompts import DJ_SYS_PROMPT, DJ_DEV_SYS_PROMPT, ROUTER_SYS_PROMPT, MCP_SYS_PROMPT from prompts import ( # pylint: disable=no-name-in-module
DJ_SYS_PROMPT,
DJ_DEV_SYS_PROMPT,
ROUTER_SYS_PROMPT,
MCP_SYS_PROMPT,
)
from tools import ( from tools import (
dj_toolkit, dj_toolkit,
dj_dev_toolkit, dj_dev_toolkit,
@@ -41,17 +46,23 @@ user = UserAgent("User")
async def main( async def main(
use_studio: bool = False, use_studio: bool = False,
available_agents: List[str] = ["dj", "dj_dev"], available_agents: List[str] = None,
retrieval_mode: str = "auto", retrieval_mode: str = "auto",
): ):
""" """
Main function for running the agent. Main function for running the agent.
:param use_studio: Whether to use agentscope studio. :param use_studio: Whether to use agentscope studio.
:param available_agents: List of available agents. Options: dj, dj_dev, dj_mcp :param available_agents: List of available agents.
:param retrieval_mode: Retrieval mode for operators. Options: auto, vector, llm Options: dj, dj_dev, dj_mcp
Default: ["dj", "dj_dev"]
:param retrieval_mode: Retrieval mode for operators.
Options: auto, vector, llm
""" """
if available_agents is None:
available_agents = ["dj", "dj_dev"]
if "dj" in available_agents: if "dj" in available_agents:
# Set global retrieval mode for tools to use # Set global retrieval mode for tools to use
os.environ["RETRIEVAL_MODE"] = retrieval_mode os.environ["RETRIEVAL_MODE"] = retrieval_mode
@@ -66,12 +77,14 @@ async def main(
DJ_SYS_PROMPT, DJ_SYS_PROMPT,
dj_toolkit, dj_toolkit,
( (
"A professional data preprocessing AI assistant with the following core capabilities: \n" "A professional data preprocessing AI assistant with the "
"following core capabilities: \n"
"Tool Matching \n" "Tool Matching \n"
"- Query and validate suitable DataJuicer operators; \n" "- Query and validate suitable DataJuicer operators; \n"
"Configuration Generation \n" "Configuration Generation \n"
"- Create YAML configuration files and preview data; \n" "- Create YAML configuration files and preview data; \n"
"Task Execution - Run data processing pipelines and output results" "Task Execution - Run data processing pipelines and "
"output results"
), ),
model, model,
formatter, formatter,
@@ -86,11 +99,15 @@ async def main(
DJ_DEV_SYS_PROMPT, DJ_DEV_SYS_PROMPT,
dj_dev_toolkit, dj_dev_toolkit,
( (
"An expert DataJuicer development assistant specializing in creating new DataJuicer operators. \n" "An expert DataJuicer development assistant specializing "
"in creating new DataJuicer operators. \n"
"Core capabilities: \n" "Core capabilities: \n"
"Reference Retrieval - fetch base classes and examples; \n" "Reference Retrieval - fetch base classes and examples; \n"
"Environment Configuration - handle DATA_JUICER_PATH setup. if user provides a DataJuicer path requiring setup/update, please call this agent;\n; " "Environment Configuration - handle DATA_JUICER_PATH "
"Code Generation - write complete, convention-compliant operator code" "setup. if user provides a DataJuicer path requiring "
"setup/update, please call this agent;\n; "
"Code Generation - write complete, convention-compliant "
"operator code"
), ),
dev_model, dev_model,
formatter, formatter,
@@ -108,9 +125,11 @@ async def main(
MCP_SYS_PROMPT, MCP_SYS_PROMPT,
mcp_toolkit, mcp_toolkit,
( (
"DataJuicer MCP Agent powered by Recipe Flow MCP server. \n" "DataJuicer MCP Agent powered by Recipe Flow MCP "
"server. \n"
"Core capabilities: \n" "Core capabilities: \n"
"- Filter operators by tags/categories using MCP protocol; \n" "- Filter operators by tags/categories using MCP "
"protocol; \n"
"- Real-time data processing pipeline execution. \n" "- Real-time data processing pipeline execution. \n"
), ),
model, model,
@@ -119,12 +138,16 @@ async def main(
) )
agents.append(mcp_agent) agents.append(mcp_agent)
# Router agent - uses agents2tools to dynamically generate tools from all agents # Router agent - uses agents2tools to dynamically generate tools from
# all agents
router_agent = create_agent( router_agent = create_agent(
"Router", "Router",
ROUTER_SYS_PROMPT, ROUTER_SYS_PROMPT,
agents2toolkit(agents), agents2toolkit(agents),
"A router agent that intelligently routes tasks to specialized DataJuicer agents", (
"A router agent that intelligently routes tasks to specialized "
"DataJuicer agents"
),
model, model,
formatter, formatter,
InMemoryMemory(), # Router uses its own memory instance InMemoryMemory(), # Router uses its own memory instance
@@ -143,7 +166,8 @@ async def main(
msg = await user(msg) msg = await user(msg)
if msg.get_text_content() == "exit": if msg.get_text_content() == "exit":
break break
# Router agent handles the entire task with automatic multi-step routing # Router agent handles the entire task with automatic multi-step
# routing
msg = await router_agent(msg) msg = await router_agent(msg)
@@ -151,13 +175,15 @@ if __name__ == "__main__":
# Example tasks # Example tasks
# project_root = os.path.abspath(os.path.dirname(__file__)) # project_root = os.path.abspath(os.path.dirname(__file__))
# task = ( # task = (
# f"The data is stored in {project_root}/data/demo-dataset-images.jsonl. " # f"The data is stored in "
# "{project_root}/data/demo-dataset-images.jsonl. "
# "Among the samples, the text field length is less than 5 " # "Among the samples, the text field length is less than 5 "
# "and the image size is less than 100Kb. " # "and the image size is less than 100Kb. "
# "And save the output results to the ./outputs path." # "And save the output results to the ./outputs path."
# ) # )
# #
# DJ Development example task: # DJ Development example task:
# task = "I want to develop a new DataJuicer filter operator to filter out audio files without vocals" # task = ("I want to develop a new DataJuicer filter operator to filter "
# "out audio files without vocals")
# #
fire.Fire(main) fire.Fire(main)

View File

@@ -1,54 +1,73 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
DJ_SYS_PROMPT = """ DJ_SYS_PROMPT = """
You are an expert data preprocessing assistant named {name}, specializing in handling multimodal data including text, images, videos, and other AI model-related data. You are an expert data preprocessing assistant named {name}, specializing in
handling multimodal data including text, images, videos, and other AI
model-related data.
You will strictly follow these steps sequentially: You will strictly follow these steps sequentially:
- Data Preview (optional but recommended): - Data Preview (optional but recommended):
Before generating the YAML, you may first use `view_text_file` to inspect a small subset of the raw data (e.g., the first 510 samples) so that you can: Before generating the YAML, you may first use `view_text_file` to inspect
1. Verify the exact field names and formats; a small subset of the raw data (e.g., the first 5-10 samples) so that you
2. Decide appropriate values such as `text_keys`, `image_key`, and the parameters of subsequent operators. can:
If the user requests or needs more specific data analysis, use `dj-analyzer` to analyze the data: 1. Verify the exact field names and formats;
1. After creating the configuration file according to the requirements, run it (see Step 2 for the configuration file creation method) 2. Decide appropriate values such as `text_keys`, `image_key`, and the
dj-analyze --config configs/your_analyzer.yaml parameters of subsequent operators.
2. you can also use auto mode to avoid writing a recipe. It will analyze a small part (e.g. 1000 samples, specified by argument `auto_num`) of your dataset with all Filters that produce stats. If the user requests or needs more specific data analysis, use
dj-analyze --auto --dataset_path xx.jsonl [--auto_num 1000] `dj-analyzer` to analyze the data:
1. After creating the configuration file according to the requirements,
run it (see Step 2 for the configuration file creation method):
dj-analyze --config configs/your_analyzer.yaml
2. you can also use auto mode to avoid writing a recipe. It will analyze
a small part (e.g. 1000 samples, specified by argument `auto_num`) of
your dataset with all Filters that produce stats.
dj-analyze --auto --dataset_path xx.jsonl [--auto_num 1000]
Step 1: Tool Discovery and Matching Step 1: Tool Discovery and Matching
- First, use the `query_dj_operators` tool to get relevant DataJuicer operators based on the user's task description - First, use the `query_dj_operators` tool to get relevant DataJuicer
- Analyze the retrieved operators and verify if they have exact functional matches with the input query operators based on the user's task description
- Analyze the retrieved operators and verify if they have exact functional
matches with the input query
- If no suitable operators are found, immediately terminate the task - If no suitable operators are found, immediately terminate the task
- If partially supported operators exist, skip incompatible parts and proceed - If partially supported operators exist, skip incompatible parts and
proceed
Step 2: Generate Configuration File Step 2: Generate Configuration File
- Create a YAML configuration containing global parameters and tool configurations. Save it to a YAML file with yaml dump api. - Create a YAML configuration containing global parameters and tool
After successful file creation, inform the user of the file location. File save failure indicates task failure. configurations. Save it to a YAML file with yaml dump api.
After successful file creation, inform the user of the file location.
File save failure indicates task failure.
a. Global Parameters: a. Global Parameters:
- project_name: Project name - project_name: Project name
- dataset_path: Real data path (never fabricate paths. Set to `None` if unknown) - dataset_path: Real data path (never fabricate paths. Set to `None`
- export_path: Output path (use default if unspecified) if unknown)
- export_path: Output path (use default if unspecified)
- text_keys: Text field names to process - text_keys: Text field names to process
- image_key: Image field name to process - image_key: Image field name to process
- np: Multiprocessing count - np: Multiprocessing count
Keep other parameters as defaults. Keep other parameters as defaults.
b. Operator Configuration: b. Operator Configuration:
- Use the operators retrieved from Step 1 to configure the 'process' field - Use the operators retrieved from Step 1 to configure the 'process'
field
- Ensure precise functional matching with user requirements - Ensure precise functional matching with user requirements
Step 3: Execute Processing Task Step 3: Execute Processing Task
Pre-execution checks: Pre-execution checks:
- dataset_path: Must be a valid user-provided path and the path must exist - dataset_path: Must be a valid user-provided path and the path must
exist
- process: Operator configuration list must exist - process: Operator configuration list must exist
Terminate immediately if any check fails and explain why. Terminate immediately if any check fails and explain why.
If all pre-execution checks are valid, run: `dj-process --config ${{YAML_config_file}}` If all pre-execution checks are valid, run:
`dj-process --config ${{YAML_config_file}}`
Mandatory Requirements: Mandatory Requirements:
- Never ask me questions. Make reasonable assumptions for non-critical parameters - Never ask me questions. Make reasonable assumptions for non-critical
parameters
- Only generate the reply after the task has finished running - Only generate the reply after the task has finished running
- Always start by retrieving relevant operators using the query_dj_operators tool - Always start by retrieving relevant operators using the query_dj_operators
tool
Configuration Template: Configuration Template:
```yaml ```yaml
@@ -60,7 +79,8 @@ image_key: {{image key to be processed}}
np: {{number of subprocess to process your dataset}} np: {{number of subprocess to process your dataset}}
skip_op_error: false # must set to false skip_op_error: false # must set to false
export_path: {{single file path to save processed data, must be a jsonl file path not a folder}} export_path: {{single file path to save processed data, must be a jsonl file
path not a folder}}
# process schedule # process schedule
# a list of several process operators with their arguments # a list of several process operators with their arguments
@@ -85,14 +105,19 @@ Function definitions:
""" """
DJ_DEV_SYS_PROMPT = """ DJ_DEV_SYS_PROMPT = """
You are an expert DataJuicer operator development assistant named {name}, specializing in helping developers create new DataJuicer operators. You are an expert DataJuicer operator development assistant named {name},
specializing in helping developers create new DataJuicer operators.
Development Workflow: Development Workflow:
1. Understand user requirements and identify operator type (filter, mapper, deduplicator, etc.) 1. Understand user requirements and identify operator type (filter, mapper,
deduplicator, etc.)
2. Call `get_basic_files()` to get base_op classes and development guidelines 2. Call `get_basic_files()` to get base_op classes and development guidelines
3. Call `get_operator_example(operator_type)` to get relevant examples 3. Call `get_operator_example(operator_type)` to get relevant examples
4. If previous tools report `DATA_JUICER_PATH` not configured, **STOP** and request user input with a clear message asking for the value of `DATA_JUICER_PATH` 4. If previous tools report `DATA_JUICER_PATH` not configured, **STOP** and
5. Once the user provides `DATA_JUICER_PATH`, call `configure_data_juicer_path(data_juicer_path)` with the provided value request user input with a clear message asking for the value of
`DATA_JUICER_PATH`
5. Once the user provides `DATA_JUICER_PATH`, call
`configure_data_juicer_path(data_juicer_path)` with the provided value
**Do not attempt to set or infer `DATA_JUICER_PATH` on your own** **Do not attempt to set or infer `DATA_JUICER_PATH` on your own**
Critical Requirements: Critical Requirements:
@@ -102,19 +127,27 @@ Critical Requirements:
- Focus on practical implementation - Focus on practical implementation
""" """
MCP_SYS_PROMPT = """You are {name}, an advanced DataJuicer MCP Agent powered by MCP server, specializing in handling multimodal data including text, images, videos, and other AI model-related data. MCP_SYS_PROMPT = """You are {name}, an advanced DataJuicer MCP Agent powered
by MCP server, specializing in handling multimodal data including text,
images, videos, and other AI model-related data.
Analyze user requirements and use the tools provided to you for data processing. Analyze user requirements and use the tools provided to you for data
processing.
Before data processing, you can also try: Before data processing, you can also try:
- Use `view_text_file` to inspect a small subset of the raw data (e.g., the first 2~5 samples) in order to: - Use `view_text_file` to inspect a small subset of the raw data (e.g., the
first 2~5 samples) in order to:
1. Verify the exact field names and formats 1. Verify the exact field names and formats
2. Determine appropriate parameter values such as text length ranges, language types, confidence thresholds, etc. 2. Determine appropriate parameter values such as text length ranges,
3. Understand data characteristics to optimize operator parameter configuration language types, confidence thresholds, etc.
3. Understand data characteristics to optimize operator parameter
configuration
""" """
ROUTER_SYS_PROMPT = """ ROUTER_SYS_PROMPT = """
You are an AI routing agent named {name}. Your primary responsibility is to analyze user queries and route them to the most appropriate specialized agent for handling. You are an AI routing agent named {name}. Your primary responsibility is to
analyze user queries and route them to the most appropriate specialized agent
for handling.
Key responsibilities: Key responsibilities:
1. Understand the user's intent and requirements 1. Understand the user's intent and requirements
@@ -122,14 +155,23 @@ Key responsibilities:
3. Handle user input requests from routed agents properly 3. Handle user input requests from routed agents properly
When routing to an agent that requires user input: When routing to an agent that requires user input:
- If the routed agent returns a response indicating that additional input or configuration is required for user confirmation or submission, you must: - If the routed agent returns a response indicating that additional input or
configuration is required for user confirmation or submission, you must:
1. Stop the current routing process 1. Stop the current routing process
2. Present the agent's request to the user directly 2. Present the agent's request to the user directly
3. Wait for user's response before continuing 3. Wait for user's response before continuing
4. Pass the user's input back to the appropriate agent 4. Pass the user's input back to the appropriate agent
- NEVER fabricate or guess user input values (like paths, configurations, etc.) - NEVER fabricate or guess user input values (like paths, configurations, etc.)
- Always ask the user for the required information when an agent needs it - Always ask the user for the required information when an agent needs it
Available agents and their capabilities will be provided as tools in your toolkit. Available agents and their capabilities will be provided as tools in your
""" toolkit.
"""
__all__ = [
"DJ_SYS_PROMPT",
"DJ_DEV_SYS_PROMPT",
"MCP_SYS_PROMPT",
"ROUTER_SYS_PROMPT",
]

View File

@@ -16,17 +16,23 @@ from agentscope.tool import Toolkit
from .dj_helpers import execute_safe_command from .dj_helpers import execute_safe_command
from .router_helpers import agent_to_tool from .router_helpers import agent_to_tool
from .dj_helpers import query_dj_operators from .dj_helpers import query_dj_operators
from .dj_dev_helpers import get_basic_files, get_operator_example, configure_data_juicer_path from .dj_dev_helpers import (
get_basic_files,
get_operator_example,
configure_data_juicer_path,
)
from .mcp_helpers import get_mcp_toolkit from .mcp_helpers import get_mcp_toolkit
def create_toolkit(tools: List[str]):
def create_toolkit(tools: List[AgentBase]):
# Create toolkit and register tools # Create toolkit and register tools
toolkit = Toolkit() toolkit = Toolkit()
for tool in tools: for tool in tools:
toolkit.register_tool_function(tool) toolkit.register_tool_function(tool)
return toolkit return toolkit
# DJ Agent tools # DJ Agent tools
dj_tools = [ dj_tools = [
execute_safe_command, execute_safe_command,
@@ -50,10 +56,12 @@ mcp_tools = [
write_text_file, write_text_file,
] ]
def agents2toolkit(agents: List[AgentBase]): def agents2toolkit(agents: List[AgentBase]):
tools = [agent_to_tool(agent) for agent in agents] tools = [agent_to_tool(agent) for agent in agents]
return create_toolkit(tools) return create_toolkit(tools)
dj_toolkit = create_toolkit(dj_tools) dj_toolkit = create_toolkit(dj_tools)
dj_dev_toolkit = create_toolkit(dj_dev_tools) dj_dev_toolkit = create_toolkit(dj_dev_tools)
@@ -71,7 +79,6 @@ __all__ = [
"dj_tools", "dj_tools",
"dj_dev_tools", "dj_dev_tools",
"mcp_tools", "mcp_tools",
"all_tools",
"agents2toolkit", "agents2toolkit",
"dj_toolkit", "dj_toolkit",
"dj_dev_toolkit", "dj_dev_toolkit",
@@ -85,4 +92,4 @@ __all__ = [
"get_basic_files", "get_basic_files",
"get_operator_example", "get_operator_example",
"configure_data_juicer_path", "configure_data_juicer_path",
] ]

View File

@@ -2,8 +2,8 @@
""" """
DataJuicer Development Tools DataJuicer Development Tools
Tools for developing DataJuicer operators, including access to basic documentation Tools for developing DataJuicer operators, including access to basic
and example code for different operator types. documentation and example code for different operator types.
""" """
import os import os
@@ -23,7 +23,8 @@ BASIC_LIST_RELATIVE = [
def get_basic_files() -> ToolResponse: def get_basic_files() -> ToolResponse:
"""Get basic DataJuicer development files content. """Get basic DataJuicer development files content.
Returns the content of essential files needed for DJ operator development: Returns the content of essential files needed for DJ operator
development:
- base_op.py: Base operator class - base_op.py: Base operator class
- DeveloperGuide.md: English developer guide - DeveloperGuide.md: English developer guide
- DeveloperGuide_ZH.md: Chinese developer guide - DeveloperGuide_ZH.md: Chinese developer guide
@@ -31,19 +32,23 @@ def get_basic_files() -> ToolResponse:
Returns: Returns:
ToolResponse: Combined content of all basic development files ToolResponse: Combined content of all basic development files
""" """
global DATA_JUICER_PATH, BASIC_LIST_RELATIVE global DATA_JUICER_PATH, BASIC_LIST_RELATIVE
if DATA_JUICER_PATH is None: if DATA_JUICER_PATH is None:
return ToolResponse( return ToolResponse(
content=[ content=[
TextBlock( TextBlock(
type="text", type="text",
text="DATA_JUICER_PATH is not configured. Please ask the user to provide the DATA_JUICER_PATH", text=(
) "DATA_JUICER_PATH is not configured. Please ask the "
] "user to provide the DATA_JUICER_PATH"
),
),
],
) )
try: try:
combined_content = "# DataJuicer Operator Development Basic Files\n\n" comb_content = "# DataJuicer Operator Development Basic Files\n\n"
for relative_path in BASIC_LIST_RELATIVE: for relative_path in BASIC_LIST_RELATIVE:
file_path = os.path.join(DATA_JUICER_PATH, relative_path) file_path = os.path.join(DATA_JUICER_PATH, relative_path)
@@ -52,20 +57,21 @@ def get_basic_files() -> ToolResponse:
with open(file_path, "r", encoding="utf-8") as f: with open(file_path, "r", encoding="utf-8") as f:
content = f.read() content = f.read()
filename = os.path.basename(file_path) file_n = os.path.basename(file_path)
combined_content += f"## {filename}\n\n" comb_content += f"## {file_n}\n\n```"
combined_content += ( flag = "python" if file_n.endswith(".py") else "markdown"
f"```{'python' if filename.endswith('.py') else 'markdown'}\n" comb_content += f"{flag}\n"
) comb_content += content
combined_content += content comb_content += "\n```\n\n"
combined_content += "\n```\n\n"
except Exception as e: except Exception as e:
combined_content += ( comb_content += (
f"## {os.path.basename(file_path)} (Read Failed)\n" f"## {os.path.basename(file_path)} (Read Failed)\n"
) )
combined_content += f"Error: {str(e)}\n\n" comb_content += f"Error: {str(e)}\n\n"
return ToolResponse(content=[TextBlock(type="text", text=combined_content)]) return ToolResponse(
content=[TextBlock(type="text", text=comb_content)],
)
except Exception as e: except Exception as e:
return ToolResponse( return ToolResponse(
@@ -73,32 +79,41 @@ def get_basic_files() -> ToolResponse:
TextBlock( TextBlock(
type="text", type="text",
text=f"Error occurred while getting basic files: {str(e)}", text=f"Error occurred while getting basic files: {str(e)}",
) ),
] ],
) )
async def get_operator_example( async def get_operator_example(
requirement_description: str, limit: int = 2 requirement_description: str,
limit: int = 2,
) -> ToolResponse: ) -> ToolResponse:
"""Get example operators based on requirement description using dynamic search. """Get example operators based on requirement description using
dynamic search.
Args: Args:
requirement_description (str): Natural language description of the operator requirement requirement_description (str): Natural language description of
limit (int): Maximum number of example operators to return (default: 2) the operator requirement
limit (int): Maximum number of example operators to return
(default: 2)
Returns: Returns:
ToolResponse: Example operator code and test files based on the requirement ToolResponse: Example operator code and test files based on
the requirement
""" """
global DATA_JUICER_PATH global DATA_JUICER_PATH
if DATA_JUICER_PATH is None: if DATA_JUICER_PATH is None:
return ToolResponse( return ToolResponse(
content=[ content=[
TextBlock( TextBlock(
type="text", type="text",
text="DATA_JUICER_PATH is not configured. Please ask the user to provide the DATA_JUICER_PATH", text=(
) "DATA_JUICER_PATH is not configured. Please ask the "
] "user to provide the DATA_JUICER_PATH"
),
),
],
) )
try: try:
@@ -108,49 +123,56 @@ async def get_operator_example(
# Query relevant operators using the requirement description # Query relevant operators using the requirement description
# Use retrieval mode from environment variable if set # Use retrieval mode from environment variable if set
retrieval_mode = os.environ.get("RETRIEVAL_MODE", "auto") retrieval_mode = os.environ.get("RETRIEVAL_MODE", "auto")
tool_names = await retrieve_ops(requirement_description, limit=limit, mode=retrieval_mode) tool_names = await retrieve_ops(
requirement_description,
limit=limit,
mode=retrieval_mode,
)
if not tool_names: if not tool_names:
return ToolResponse( return ToolResponse(
content=[ content=[
TextBlock( TextBlock(
type="text", type="text",
text=f"No relevant operators found for requirement: {requirement_description}\n" text=(
f"Please try with more specific keywords or check if DATA_JUICER_PATH is properly configured.", "No relevant operators found for requirement: "
) f"{requirement_description}\n"
] "Please try with more specific keywords or "
"check if DATA_JUICER_PATH is properly "
"configured."
),
),
],
) )
combined_content = ( comb_content = (
f"# Dynamic Operator Examples for: {requirement_description}\n\n" f"# Dynamic Operator Examples for: {requirement_description}\n\n"
) )
combined_content += ( comb_content += (
f"Found {len(tool_names)} relevant operators (limit: {limit})\n\n" f"Found {len(tool_names)} relevant operators (limit: {limit})\n\n"
) )
# Process each found operator # Process each found operator
for i, tool_name in enumerate(tool_names[:limit]): for i, tool_name in enumerate(tool_names[:limit]):
combined_content += f"## {i+1}. {tool_name}\n\n" comb_content += f"## {i+1}. {tool_name}\n\n"
op_type = tool_name.split("_")[-1] op_type = tool_name.split("_")[-1]
operator_path = f"data_juicer/ops/{op_type}/{tool_name}.py" operator_path = f"data_juicer/ops/{op_type}/{tool_name}.py"
# Try to find operator source file # Try to find operator source file
full_path = os.path.join(DATA_JUICER_PATH, operator_path) full_path = os.path.join(DATA_JUICER_PATH, operator_path)
if os.path.exists(full_path): if os.path.exists(full_path):
with open(full_path, "r", encoding="utf-8") as f: with open(full_path, "r", encoding="utf-8") as f:
operator_code = f.read() operator_code = f.read()
combined_content += f"### Source Code\n" comb_content += "### Source Code\n"
combined_content += "```python\n" comb_content += "```python\n"
combined_content += operator_code comb_content += operator_code
combined_content += "\n```\n\n" comb_content += "\n```\n\n"
else: else:
combined_content += ( comb_content += "**Note:** Source code file not found for"
f"**Note:** Source code file not found for `{tool_name}`.\n\n" comb_content += f" `{tool_name}`.\n\n"
)
test_path = f"tests/ops/{op_type}/test_{tool_name}.py" test_path = f"tests/ops/{op_type}/test_{tool_name}.py"
@@ -159,36 +181,43 @@ async def get_operator_example(
with open(full_test_path, "r", encoding="utf-8") as f: with open(full_test_path, "r", encoding="utf-8") as f:
test_code = f.read() test_code = f.read()
combined_content += f"### Test Code\n" comb_content += "### Test Code\n"
combined_content += f"**File Path:** `{test_path}`\n\n" comb_content += f"**File Path:** `{test_path}`\n\n"
combined_content += "```python\n" comb_content += "```python\n"
combined_content += test_code comb_content += test_code
combined_content += "\n```\n\n" comb_content += "\n```\n\n"
else: else:
combined_content += ( comb_content += (
f"**Note:** Test file not found for `{tool_name}`.\n\n" f"**Note:** Test file not found for `{tool_name}`.\n\n"
) )
combined_content += "---\n\n" comb_content += "---\n\n"
return ToolResponse(content=[TextBlock(type="text", text=combined_content)]) return ToolResponse(
content=[TextBlock(type="text", text=comb_content)],
)
except Exception as e: except Exception as e:
return ToolResponse( return ToolResponse(
content=[ content=[
TextBlock( TextBlock(
type="text", type="text",
text=f"Error occurred while getting operator examples: {str(e)}\n" text=(
f"Please check the requirement description and try again.", "Error occurred while getting operator examples: "
) f"{str(e)}\n"
] "Please check the requirement description and try "
"again."
),
),
],
) )
def configure_data_juicer_path(data_juicer_path: str) -> ToolResponse: def configure_data_juicer_path(data_juicer_path: str) -> ToolResponse:
"""Configure DataJuicer path. """Configure DataJuicer path.
If the user provides the data_juicer_path, please use this method to configure it. If the user provides the data_juicer_path, please use this method to
configure it.
Args: Args:
data_juicer_path (str): Path to DataJuicer installation data_juicer_path (str): Path to DataJuicer installation
@@ -196,8 +225,9 @@ def configure_data_juicer_path(data_juicer_path: str) -> ToolResponse:
Returns: Returns:
ToolResponse: Configuration result ToolResponse: Configuration result
""" """
global DATA_JUICER_PATH global DATA_JUICER_PATH
data_juicer_path = os.path.expanduser(data_juicer_path) data_juicer_path = os.path.expanduser(data_juicer_path)
try: try:
@@ -206,9 +236,12 @@ def configure_data_juicer_path(data_juicer_path: str) -> ToolResponse:
content=[ content=[
TextBlock( TextBlock(
type="text", type="text",
text=f"Specified DataJuicer path does not exist: {data_juicer_path}", text=(
) "Specified DataJuicer path does not exist: "
] f"{data_juicer_path}"
),
),
],
) )
# Update global DATA_JUICER_PATH # Update global DATA_JUICER_PATH
@@ -218,9 +251,12 @@ def configure_data_juicer_path(data_juicer_path: str) -> ToolResponse:
content=[ content=[
TextBlock( TextBlock(
type="text", type="text",
text=f"DataJuicer path has been updated to: {DATA_JUICER_PATH}", text=(
) "DataJuicer path has been updated to: ",
] f"{DATA_JUICER_PATH}",
),
),
],
) )
except Exception as e: except Exception as e:
@@ -228,7 +264,10 @@ def configure_data_juicer_path(data_juicer_path: str) -> ToolResponse:
content=[ content=[
TextBlock( TextBlock(
type="text", type="text",
text=f"Error occurred while configuring DataJuicer path: {str(e)}", text=(
) "Error occurred while configuring DataJuicer path: "
] f"{str(e)}"
),
),
],
) )

View File

@@ -1,14 +1,19 @@
# -*- coding: utf-8 -*-
import os import os
import os.path as osp import os.path as osp
import json import json
import asyncio import asyncio
from typing import Any
from agentscope.message import TextBlock from agentscope.message import TextBlock
from agentscope.tool import ToolResponse from agentscope.tool import ToolResponse
from .op_manager.op_retrieval import retrieve_ops from .op_manager.op_retrieval import retrieve_ops
# Load tool information for formatting # Load tool information for formatting
TOOLS_INFO_PATH = osp.join(osp.dirname(__file__), "op_manager", "dj_funcs_all.json") TOOLS_INFO_PATH = osp.join(
osp.dirname(__file__),
"op_manager",
"dj_funcs_all.json",
)
def _load_tools_info(): def _load_tools_info():
"""Load tools information from JSON file or create it if not exists""" """Load tools information from JSON file or create it if not exists"""
@@ -17,30 +22,35 @@ def _load_tools_info():
return json.loads(f.read()) return json.loads(f.read())
else: else:
from .op_manager.create_dj_func_info import dj_func_info from .op_manager.create_dj_func_info import dj_func_info
with open(TOOLS_INFO_PATH, "w", encoding="utf-8") as f: with open(TOOLS_INFO_PATH, "w", encoding="utf-8") as f:
json.dump(dj_func_info, f) json.dump(dj_func_info, f)
return dj_func_info return dj_func_info
def _format_tool_names_to_class_entries(tool_names): def _format_tool_names_to_class_entries(tool_names):
"""Convert tool names list to formatted class entries string""" """Convert tool names list to formatted class entries string"""
if not tool_names: if not tool_names:
return "" return ""
tools_info = _load_tools_info() tools_info = _load_tools_info()
# Create a mapping from class_name to tool info for quick lookup # Create a mapping from class_name to tool info for quick lookup
tools_map = {tool['class_name']: tool for tool in tools_info} tools_map = {tool["class_name"]: tool for tool in tools_info}
formatted_entries = [] formatted_entries = []
for i, tool_name in enumerate(tool_names): for i, tool_name in enumerate(tool_names):
if tool_name in tools_map: if tool_name in tools_map:
tool_info = tools_map[tool_name] tool_info = tools_map[tool_name]
class_entry = f"{i+1}. {tool_info['class_name']}: {tool_info['class_desc']}" class_entry = (
f"{i+1}. {tool_info['class_name']}: {tool_info['class_desc']}"
)
class_entry += "\n" + tool_info["arguments"] class_entry += "\n" + tool_info["arguments"]
formatted_entries.append(class_entry) formatted_entries.append(class_entry)
return "\n".join(formatted_entries) return "\n".join(formatted_entries)
async def query_dj_operators(query: str, limit: int = 20) -> ToolResponse: async def query_dj_operators(query: str, limit: int = 20) -> ToolResponse:
"""Query DataJuicer operators by natural language description. """Query DataJuicer operators by natural language description.
@@ -52,26 +62,33 @@ async def query_dj_operators(query: str, limit: int = 20) -> ToolResponse:
limit (int): Maximum number of operators to return (default: 20) limit (int): Maximum number of operators to return (default: 20)
Returns: Returns:
ToolResponse: Tool response containing matched operators with names, descriptions, and parameters ToolResponse: Tool response containing matched operators with names,
descriptions, and parameters
""" """
try: try:
# Retrieve operator names using existing functionality with limit # Retrieve operator names using existing functionality with limit
# Use retrieval mode from environment variable if set # Use retrieval mode from environment variable if set
retrieval_mode = os.environ.get("RETRIEVAL_MODE", "auto") retrieval_mode = os.environ.get("RETRIEVAL_MODE", "auto")
tool_names = await retrieve_ops(query, limit=limit, mode=retrieval_mode) tool_names = await retrieve_ops(
query,
limit=limit,
mode=retrieval_mode,
)
if not tool_names: if not tool_names:
return ToolResponse( return ToolResponse(
content=[ content=[
TextBlock( TextBlock(
type="text", type="text",
text=f"No matching DataJuicer operators found for query: {query}\n" text="No matching DataJuicer operators found for "
f"Suggestions:\n" f"query: {query}\n"
f"1. Use more specific keywords like 'text filter', 'image processing'\n" "Suggestions:\n"
f"2. Check spelling and try alternative terms\n" "1. Use more specific keywords like 'text filter', "
f"3. Try English keywords for better matching", "'image processing'\n"
) "2. Check spelling and try alternative terms\n"
"3. Try English keywords for better matching",
),
], ],
) )
@@ -79,7 +96,7 @@ async def query_dj_operators(query: str, limit: int = 20) -> ToolResponse:
retrieved_operators = _format_tool_names_to_class_entries(tool_names) retrieved_operators = _format_tool_names_to_class_entries(tool_names)
# Format response # Format response
result_text = f"🔍 DataJuicer Operator Query Results\n" result_text = "🔍 DataJuicer Operator Query Results\n"
result_text += f"Query: {query}\n" result_text += f"Query: {query}\n"
result_text += f"Limit: {limit} operators\n" result_text += f"Limit: {limit} operators\n"
result_text += f"{'='*50}\n\n" result_text += f"{'='*50}\n\n"
@@ -90,7 +107,7 @@ async def query_dj_operators(query: str, limit: int = 20) -> ToolResponse:
TextBlock( TextBlock(
type="text", type="text",
text=result_text, text=result_text,
) ),
], ],
) )
@@ -101,7 +118,7 @@ async def query_dj_operators(query: str, limit: int = 20) -> ToolResponse:
type="text", type="text",
text=f"Error querying DataJuicer operators: {str(e)}\n" text=f"Error querying DataJuicer operators: {str(e)}\n"
f"Please verify query parameters and retry.", f"Please verify query parameters and retry.",
) ),
], ],
) )
@@ -109,10 +126,11 @@ async def query_dj_operators(query: str, limit: int = 20) -> ToolResponse:
async def execute_safe_command( async def execute_safe_command(
command: str, command: str,
timeout: int = 300, timeout: int = 300,
**kwargs: Any,
) -> ToolResponse: ) -> ToolResponse:
"""Execute safe commands including DataJuicer commands and other safe system commands. """Execute safe commands including DataJuicer commands and other safe
Returns the return code, standard output and error within <returncode></returncode>, system commands.
Returns the return code, standard output and error within
<returncode></returncode>,
<stdout></stdout> and <stderr></stderr> tags. <stdout></stdout> and <stderr></stderr> tags.
Args: Args:
@@ -131,39 +149,67 @@ async def execute_safe_command(
The tool response containing the return code, standard output, and The tool response containing the return code, standard output, and
standard error of the executed command. standard error of the executed command.
""" """
# Security check: only allow safe commands # Security check: only allow safe commands
command_stripped = command.strip() command_stripped = command.strip()
# Define allowed command prefixes for security # Define allowed command prefixes for security
allowed_commands = [ allowed_commands = [
# DataJuicer commands # DataJuicer commands
'dj-process', 'dj-analyze', "dj-process",
"dj-analyze",
# File system operations # File system operations
'mkdir', 'ls', 'pwd', 'cat', 'echo', 'cp', 'mv', 'rm', "mkdir",
"ls",
"pwd",
"cat",
"echo",
"cp",
"mv",
"rm",
# Text processing # Text processing
'grep', 'head', 'tail', 'wc', 'sort', 'uniq', "grep",
"head",
"tail",
"wc",
"sort",
"uniq",
# Archive operations # Archive operations
'tar', 'zip', 'unzip', "tar",
"zip",
"unzip",
# Information commands # Information commands
'which', 'whoami', 'date', 'find', "which",
"whoami",
"date",
"find",
# Python commands # Python commands
'python', 'python3', 'pip', 'uv' "python",
"python3",
"pip",
"uv",
] ]
# Check if command starts with any allowed command # Check if command starts with any allowed command
command_allowed = False command_allowed = False
for allowed_cmd in allowed_commands: for allowed_cmd in allowed_commands:
if command_stripped.startswith(allowed_cmd): if command_stripped.startswith(allowed_cmd):
# Additional security checks for potentially dangerous commands # Additional security checks for potentially dangerous commands
if allowed_cmd in ['rm', 'mv'] and ('/' in command_stripped or '..' in command_stripped): if allowed_cmd in ["rm", "mv"] and (
"/" in command_stripped or ".." in command_stripped
):
# Prevent dangerous path operations # Prevent dangerous path operations
continue continue
command_allowed = True command_allowed = True
break break
if not command_allowed: if not command_allowed:
error_msg = f"Error: Command not allowed for security reasons. Allowed commands: {', '.join(allowed_commands)}. Received command: {command}" error_msg = (
"Error: Command not allowed for security reasons. "
"Allowed commands: "
f"{', '.join(allowed_commands)}. "
f"Received command: {command}"
)
return ToolResponse( return ToolResponse(
content=[ content=[
TextBlock( TextBlock(
@@ -193,7 +239,7 @@ async def execute_safe_command(
except asyncio.TimeoutError: except asyncio.TimeoutError:
stderr_suffix = ( stderr_suffix = (
f"TimeoutError: The command execution exceeded " "TimeoutError: The command execution exceeded "
f"the timeout of {timeout} seconds." f"the timeout of {timeout} seconds."
) )
returncode = -1 returncode = -1
@@ -221,4 +267,4 @@ async def execute_safe_command(
), ),
), ),
], ],
) )

View File

@@ -1,11 +1,16 @@
# -*- coding: utf-8 -*-
import json import json
import os import os
import logging import logging
from typing import Optional, List from typing import Optional
import string import string
from agentscope.tool import Toolkit from agentscope.tool import Toolkit
from agentscope.mcp import HttpStatefulClient, HttpStatelessClient, StdIOStatefulClient from agentscope.mcp import (
HttpStatefulClient,
HttpStatelessClient,
StdIOStatefulClient,
)
# Configure logging # Configure logging
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
@@ -13,6 +18,7 @@ logger = logging.getLogger(__name__)
root_path = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) root_path = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
def _load_config(config_path: str) -> dict: def _load_config(config_path: str) -> dict:
"""Load MCP configuration from file""" """Load MCP configuration from file"""
try: try:
@@ -23,13 +29,15 @@ def _load_config(config_path: str) -> dict:
return config return config
else: else:
logger.warning( logger.warning(
f"Configuration file {config_path} not found, using default settings" f"Configuration file {config_path} not found, "
"using default settings",
) )
return _create_default_config() return _create_default_config()
except Exception as e: except Exception as e:
logger.error(f"Error loading configuration: {e}") logger.error(f"Error loading configuration: {e}")
return _create_default_config() return _create_default_config()
def _create_default_config() -> dict: def _create_default_config() -> dict:
"""Create default configuration""" """Create default configuration"""
return { return {
@@ -38,10 +46,11 @@ def _create_default_config() -> dict:
"command": "python", "command": "python",
"args": ["/home/test/data_juicer/tools/DJ_mcp_recipe_flow.py"], "args": ["/home/test/data_juicer/tools/DJ_mcp_recipe_flow.py"],
"env": {"SERVER_TRANSPORT": "stdio"}, "env": {"SERVER_TRANSPORT": "stdio"},
} },
} },
} }
def _expand_env_vars(value: str) -> str: def _expand_env_vars(value: str) -> str:
"""Expand environment variables in configuration values""" """Expand environment variables in configuration values"""
if isinstance(value, str): if isinstance(value, str):
@@ -53,6 +62,7 @@ def _expand_env_vars(value: str) -> str:
return value return value
return value return value
async def _create_clients(config: dict, toolkit: Toolkit): async def _create_clients(config: dict, toolkit: Toolkit):
"""Create MCP clients based on configuration""" """Create MCP clients based on configuration"""
server_configs = config.get("mcpServers", {}) server_configs = config.get("mcpServers", {})
@@ -88,33 +98,38 @@ async def _create_clients(config: dict, toolkit: Toolkit):
if stateful: if stateful:
client = HttpStatefulClient( client = HttpStatefulClient(
name=server_name, transport=transport, url=url name=server_name,
transport=transport,
url=url,
) )
await client.connect() await client.connect()
await toolkit.register_mcp_client(client) await toolkit.register_mcp_client(client)
else: else:
client = HttpStatelessClient( client = HttpStatelessClient(
name=server_name, transport=transport, url=url name=server_name,
transport=transport,
url=url,
) )
await toolkit.register_mcp_client(client) await toolkit.register_mcp_client(client)
else: else:
raise ValueError("Invalid server configuration") raise ValueError("Invalid server configuration")
clients.append(client) clients.append(client)
except Exception as e: except Exception as e:
if "Invalid server configuration" in str(e): if "Invalid server configuration" in str(e):
raise e raise e
logger.error(f"Failed to create client {server_name}: {e}") logger.error(f"Failed to create client {server_name}: {e}")
return clients return clients
async def get_mcp_toolkit(config_path: Optional[str] = None) -> Toolkit: async def get_mcp_toolkit(config_path: Optional[str] = None) -> Toolkit:
"""Get toolkit with all MCP tools registered""" """Get toolkit with all MCP tools registered"""
config_path = config_path or root_path + "/configs/mcp_config.json" config_path = config_path or root_path + "/configs/mcp_config.json"
config = _load_config(config_path) config = _load_config(config_path)
toolkit = Toolkit() toolkit = Toolkit()
clients = await _create_clients(config, toolkit) clients = await _create_clients(config, toolkit)
return toolkit, clients return toolkit, clients

View File

@@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
import inspect import inspect
from data_juicer.tools.op_search import OPSearcher from data_juicer.tools.op_search import OPSearcher
@@ -7,7 +8,11 @@ all_ops = searcher.search()
dj_func_info = [] dj_func_info = []
for i, op in enumerate(all_ops): for i, op in enumerate(all_ops):
class_entry = {"index": i, "class_name": op["name"], "class_desc": op["desc"]} class_entry = {
"index": i,
"class_name": op["name"],
"class_desc": op["desc"],
}
param_desc = op["param_desc"] param_desc = op["param_desc"]
param_desc_map = {} param_desc_map = {}
args = "" args = ""
@@ -27,7 +32,8 @@ for i, op in enumerate(all_ops):
): ):
continue continue
if param_name in param_desc_map: if param_name in param_desc_map:
args += f" {param_name} ({param.annotation}): {param_desc_map[param_name]}\n" args += f" {param_name} ({param.annotation}):"
args += f" {param_desc_map[param_name]}\n"
else: else:
args += f" {param_name} ({param.annotation})\n" args += f" {param_name} ({param.annotation})\n"
class_entry["arguments"] = args class_entry["arguments"] = args

View File

@@ -1,8 +1,8 @@
# -*- coding: utf-8 -*-
import os import os
import os.path as osp import os.path as osp
import json import json
import logging import logging
import pickle
import hashlib import hashlib
import time import time
from typing import Optional from typing import Optional
@@ -18,17 +18,22 @@ _cached_vector_store: Optional[FAISS] = None
_cached_tools_info: Optional[list] = None _cached_tools_info: Optional[list] = None
_cached_file_hash: Optional[str] = None _cached_file_hash: Optional[str] = None
RETRIEVAL_PROMPT = """You are a professional tool retrieval assistant responsible for filtering the top {limit} most relevant tools from a large tool library based on user requirements. Execute the following steps: RETRIEVAL_PROMPT = """You are a professional tool retrieval assistant
responsible for filtering the top {limit} most relevant tools from a large
tool library based on user requirements. Execute the following steps:
# Requirement Analysis # Requirement Analysis
Carefully read the user's [requirement description], extract core keywords, functional objectives, usage scenarios, and technical requirements (such as real-time performance, data types, industry domains, etc.). Carefully read the user's [requirement description], extract core keywords,
functional objectives, usage scenarios, and technical requirements
(such as real-time performance, data types, industry domains, etc.).
# Tool Matching # Tool Matching
Perform multi-dimensional matching based on the following tool attributes: Perform multi-dimensional matching based on the following tool attributes:
- Tool name and functional description - Tool name and functional description
- Supported input/output formats - Supported input/output formats
- Applicable industry or scenario tags - Applicable industry or scenario tags
- Technical implementation principles (API, local deployment, AI model types) - Technical implementation principles
(API, local deployment, AI model types)
- Relevance ranking - Relevance ranking
# Use weighted scoring mechanism (example weights): # Use weighted scoring mechanism (example weights):
@@ -59,7 +64,8 @@ RETRIEVAL_PROMPT = """You are a professional tool retrieval assistant responsibl
"key_match": ["Matching keywords/features"] "key_match": ["Matching keywords/features"]
}} }}
] ]
Output strictly in JSON array format, and only output the JSON array format tool list. Output strictly in JSON array format, and only output the JSON array format
tool list.
""" """
@@ -96,9 +102,15 @@ async def retrieve_ops_lm(user_query, limit=20):
else: else:
from create_dj_func_info import dj_func_info from create_dj_func_info import dj_func_info
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) project_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), ".."),
)
with open(os.path.join(project_root, TOOLS_INFO_PATH), "w") as f: with open(
os.path.join(project_root, TOOLS_INFO_PATH),
"w",
encoding="utf-8",
) as f:
f.write(json.dumps(dj_func_info)) f.write(json.dumps(dj_func_info))
tool_descriptions = [ tool_descriptions = [
@@ -123,15 +135,13 @@ async def retrieve_ops_lm(user_query, limit=20):
user_prompt = ( user_prompt = (
retrieval_prompt_with_limit retrieval_prompt_with_limit
+ """ + f"""
User requirement description: User requirement description:
{user_query} {user_query}
Available tools: Available tools:
{tools_string} {tools_string}
""".format( """
user_query=user_query, tools_string=tools_string
)
) )
msgs = [ msgs = [
@@ -191,13 +201,11 @@ def _load_cached_index() -> bool:
index_path = osp.join(VECTOR_INDEX_CACHE_PATH, "faiss_index") index_path = osp.join(VECTOR_INDEX_CACHE_PATH, "faiss_index")
metadata_path = osp.join(VECTOR_INDEX_CACHE_PATH, "metadata.json") metadata_path = osp.join(VECTOR_INDEX_CACHE_PATH, "metadata.json")
if not all( if not all(os.path.exists(p) for p in [index_path, metadata_path]):
os.path.exists(p) for p in [index_path, metadata_path]
):
return False return False
# Check if cached index matches current tools info file # Check if cached index matches current tools info file
with open(metadata_path, "r") as f: with open(metadata_path, "r", encoding="utf-8") as f:
metadata = json.load(f) metadata = json.load(f)
cached_hash = metadata.get("tools_info_hash", "") cached_hash = metadata.get("tools_info_hash", "")
@@ -215,7 +223,9 @@ def _load_cached_index() -> bool:
) )
_cached_vector_store = FAISS.load_local( _cached_vector_store = FAISS.load_local(
index_path, embeddings, allow_dangerous_deserialization=True index_path,
embeddings,
allow_dangerous_deserialization=True,
) )
_cached_file_hash = cached_hash _cached_file_hash = cached_hash
@@ -244,8 +254,11 @@ def _save_cached_index():
_cached_vector_store.save_local(index_path) _cached_vector_store.save_local(index_path)
# Save metadata # Save metadata
metadata = {"tools_info_hash": _cached_file_hash, "created_at": time.time()} metadata = {
with open(metadata_path, "w") as f: "tools_info_hash": _cached_file_hash,
"created_at": time.time(),
}
with open(metadata_path, "w", encoding="utf-8") as f:
json.dump(metadata, f) json.dump(metadata, f)
logging.info("Successfully saved vector index to cache") logging.info("Successfully saved vector index to cache")
@@ -261,16 +274,23 @@ def _build_vector_index():
with open(TOOLS_INFO_PATH, "r", encoding="utf-8") as f: with open(TOOLS_INFO_PATH, "r", encoding="utf-8") as f:
tools_info = json.loads(f.read()) tools_info = json.loads(f.read())
tool_descriptions = [f"{t['class_name']}: {t['class_desc']}" for t in tools_info] tool_descriptions = [
f"{t['class_name']}: {t['class_desc']}" for t in tools_info
]
from langchain_community.embeddings import DashScopeEmbeddings from langchain_community.embeddings import DashScopeEmbeddings
embeddings = DashScopeEmbeddings( embeddings = DashScopeEmbeddings(
dashscope_api_key=os.environ.get("DASHSCOPE_API_KEY"), model="text-embedding-v1" dashscope_api_key=os.environ.get("DASHSCOPE_API_KEY"),
model="text-embedding-v1",
) )
metadatas = [{"index": i} for i in range(len(tool_descriptions))] metadatas = [{"index": i} for i in range(len(tool_descriptions))]
vector_store = FAISS.from_texts(tool_descriptions, embeddings, metadatas=metadatas) vector_store = FAISS.from_texts(
tool_descriptions,
embeddings,
metadatas=metadatas,
)
# Cache the results # Cache the results
_cached_vector_store = vector_store _cached_vector_store = vector_store
@@ -283,7 +303,7 @@ def _build_vector_index():
def retrieve_ops_vector(user_query, limit=20): def retrieve_ops_vector(user_query, limit=20):
"""Tool retrieval using vector search with caching - returns list of tool names""" """Tool retrieval using vector search with caching"""
global _cached_vector_store global _cached_vector_store
# Try to load from cache first # Try to load from cache first
@@ -292,7 +312,10 @@ def retrieve_ops_vector(user_query, limit=20):
_build_vector_index() _build_vector_index()
# Perform similarity search # Perform similarity search
retrieved_tools = _cached_vector_store.similarity_search(user_query, k=limit) retrieved_tools = _cached_vector_store.similarity_search(
user_query,
k=limit,
)
retrieved_indices = [doc.metadata["index"] for doc in retrieved_tools] retrieved_indices = [doc.metadata["index"] for doc in retrieved_tools]
with open(TOOLS_INFO_PATH, "r", encoding="utf-8") as f: with open(TOOLS_INFO_PATH, "r", encoding="utf-8") as f:
@@ -307,7 +330,11 @@ def retrieve_ops_vector(user_query, limit=20):
return tool_names return tool_names
async def retrieve_ops(user_query: str, limit: int = 20, mode: str = "auto") -> list: async def retrieve_ops(
user_query: str,
limit: int = 20,
mode: str = "auto",
) -> list:
""" """
Tool retrieval with configurable mode Tool retrieval with configurable mode
@@ -322,59 +349,56 @@ async def retrieve_ops(user_query: str, limit: int = 20, mode: str = "auto") ->
Returns: Returns:
List of tool names List of tool names
""" """
if mode == "llm": if mode in ("llm", "auto"):
try: try:
return await retrieve_ops_lm(user_query, limit=limit) return await retrieve_ops_lm(user_query, limit=limit)
except Exception as e: except Exception as e:
logging.error(f"LLM retrieval failed: {str(e)}") logging.error(f"LLM retrieval failed: {str(e)}")
return [] if mode != "auto":
return []
elif mode == "vector": if mode in ("vector", "auto"):
try: try:
return retrieve_ops_vector(user_query, limit=limit) return retrieve_ops_vector(user_query, limit=limit)
except Exception as e: except Exception as e:
logging.error(f"Vector retrieval failed: {str(e)}") logging.error(f"Vector retrieval failed: {str(e)}")
return [] return []
elif mode == "auto":
try:
return await retrieve_ops_lm(user_query, limit=limit)
except Exception as e:
import traceback
print(traceback.format_exc())
try:
return retrieve_ops_vector(user_query, limit=limit)
except Exception as fallback_e:
logging.error(
f"Tool retrieval failed: {str(e)}, fallback retrieval also failed: {str(fallback_e)}"
)
return []
else: else:
raise ValueError(f"Invalid mode: {mode}. Must be 'llm', 'vector', or 'auto'") raise ValueError(
f"Invalid mode: {mode}. Must be 'llm', 'vector', or 'auto'",
)
if __name__ == "__main__": if __name__ == "__main__":
import asyncio import asyncio
user_query = ( query = (
"Clean special characters from text and filter samples with excessive length. Mask sensitive information and filter unsafe content including adult/terror-related terms." "Clean special characters from text and filter samples with "
+ "Additionally, filter out small images, perform image tagging, and remove duplicate images." + "excessive length. Mask sensitive information and filter "
+ "unsafe content including adult/terror-related terms."
+ "Additionally, filter out small images, perform image "
+ "tagging, and remove duplicate images."
) )
# Test different modes # Test different modes
print("=== Testing LLM mode ===") print("=== Testing LLM mode ===")
tool_names_llm = asyncio.run(retrieve_ops(user_query, limit=10, mode="llm")) tool_names_llm = asyncio.run(
retrieve_ops(query, limit=10, mode="llm"),
)
print("Retrieved tool names (LLM):") print("Retrieved tool names (LLM):")
print(tool_names_llm) print(tool_names_llm)
print("\n=== Testing Vector mode ===") print("\n=== Testing Vector mode ===")
tool_names_vector = asyncio.run(retrieve_ops(user_query, limit=10, mode="vector")) tool_names_vector = asyncio.run(
retrieve_ops(query, limit=10, mode="vector"),
)
print("Retrieved tool names (Vector):") print("Retrieved tool names (Vector):")
print(tool_names_vector) print(tool_names_vector)
print("\n=== Testing Auto mode (default) ===") print("\n=== Testing Auto mode (default) ===")
tool_names_auto = asyncio.run(retrieve_ops(user_query, limit=10, mode="auto")) tool_names_auto = asyncio.run(
retrieve_ops(query, limit=10, mode="auto"),
)
print("Retrieved tool names (Auto):") print("Retrieved tool names (Auto):")
print(tool_names_auto) print(tool_names_auto)

View File

@@ -7,7 +7,9 @@ from agentscope.tool import ToolResponse
def agent_to_tool( def agent_to_tool(
agent: AgentBase, tool_name: str = None, description: str = None agent: AgentBase,
tool_name: str = None,
description: str = None,
) -> Callable: ) -> Callable:
""" """
Convert any agent to a tool function that can be registered in toolkit. Convert any agent to a tool function that can be registered in toolkit.
@@ -15,10 +17,12 @@ def agent_to_tool(
Args: Args:
agent: The agent instance to convert agent: The agent instance to convert
tool_name: Optional custom tool name (defaults to agent.name) tool_name: Optional custom tool name (defaults to agent.name)
description: Optional tool description (defaults to agent's docstring or sys_prompt) description: Optional tool description
(defaults to agent's docstring or sys_prompt)
Returns: Returns:
A tool function that can be registered with toolkit.register_tool_function() A tool function that can be registered with
toolkit.register_tool_function()
""" """
# Get tool name and description # Get tool name and description
if tool_name is None: if tool_name is None:
@@ -30,8 +34,6 @@ def agent_to_tool(
description = agent.__doc__.strip() description = agent.__doc__.strip()
elif hasattr(agent, "sys_prompt"): elif hasattr(agent, "sys_prompt"):
description = f"Agent: {agent.sys_prompt[:100]}..." description = f"Agent: {agent.sys_prompt[:100]}..."
elif hasattr(agent, "_sys_prompt"):
description = f"Agent: {agent._sys_prompt[:100]}..."
else: else:
description = f"Tool function for {tool_name}" description = f"Tool function for {tool_name}"
@@ -56,7 +58,8 @@ def agent_to_tool(
# Set function name and docstring # Set function name and docstring
tool_function.__name__ = f"call_{tool_name.lower().replace(' ', '_')}" tool_function.__name__ = f"call_{tool_name.lower().replace(' ', '_')}"
tool_function.__doc__ = ( tool_function.__doc__ = (
f"{description}\n\nArgs:\n task (str): The task for {tool_name} to handle" f"{description}\n\nArgs:"
+ "\n task (str): The task for {tool_name} to handle"
) )
return tool_function return tool_function

View File

@@ -1,15 +1,8 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import os import os
from unittest.mock import AsyncMock, Mock, patch
import sys
from pathlib import Path
root_path = Path(__file__).parent.parent
sys.path.insert(0, str(root_path))
sys.path.insert(0, str(Path(root_path) / "data_juicer_agent"))
import pytest import pytest
from unittest.mock import AsyncMock, Mock, patch
from agentscope.agent import ReActAgent from agentscope.agent import ReActAgent
from agentscope.model import DashScopeChatModel from agentscope.model import DashScopeChatModel
from agentscope.tool import Toolkit from agentscope.tool import Toolkit
@@ -81,10 +74,10 @@ def mock_mcp_client():
@pytest.fixture @pytest.fixture
def mock_agent( def mock_agent(
mock_model, mock_model, # pylint: disable=redefined-outer-name
mock_formatter, mock_formatter, # pylint: disable=redefined-outer-name
mock_toolkit, mock_toolkit, # pylint: disable=redefined-outer-name
mock_memory, mock_memory, # pylint: disable=redefined-outer-name
): ):
"""Create a mocked ReActAgent instance""" """Create a mocked ReActAgent instance"""
agent = Mock(spec=ReActAgent) agent = Mock(spec=ReActAgent)
@@ -101,7 +94,11 @@ def mock_agent(
class TestDataJuicerAgent: class TestDataJuicerAgent:
"""Test suite for the data_juicer_agent functionality""" """Test suite for the data_juicer_agent functionality"""
def create_named_mock_agent(self, name, mock_agent, *args, **kwargs): def named_mock_agent(
self,
name,
mock_agent, # pylint: disable=redefined-outer-name
):
"""Create a named mock agent for testing""" """Create a named mock agent for testing"""
agent_instance = Mock(spec=ReActAgent) agent_instance = Mock(spec=ReActAgent)
agent_instance.model = mock_agent.model agent_instance.model = mock_agent.model
@@ -112,7 +109,19 @@ class TestDataJuicerAgent:
agent_instance.name = name agent_instance.name = name
return agent_instance return agent_instance
async def mock_user_func(self, msg=None): def _named_mock_agent_side_effect(
self,
mock_agent, # pylint: disable=redefined-outer-name
):
"""Side effect function for creating named mock agents"""
return lambda name, *args, **kwargs: self.named_mock_agent(
name,
mock_agent,
*args,
**kwargs,
)
async def mock_user_func(self):
return Msg("user", "exit", role="user") return Msg("user", "exit", role="user")
def test_dj_toolkit_initialization(self): def test_dj_toolkit_initialization(self):
@@ -139,7 +148,9 @@ class TestDataJuicerAgent:
assert dj_dev_toolkit.tools.get("write_text_file") is not None assert dj_dev_toolkit.tools.get("write_text_file") is not None
assert dj_dev_toolkit.tools.get("get_basic_files") is not None assert dj_dev_toolkit.tools.get("get_basic_files") is not None
assert dj_dev_toolkit.tools.get("get_operator_example") is not None assert dj_dev_toolkit.tools.get("get_operator_example") is not None
assert dj_dev_toolkit.tools.get("configure_data_juicer_path") is not None assert (
dj_dev_toolkit.tools.get("configure_data_juicer_path") is not None
)
# Verify tool list contains expected tools # Verify tool list contains expected tools
expected_tools = [ expected_tools = [
@@ -154,8 +165,11 @@ class TestDataJuicerAgent:
assert tool in dj_dev_tools assert tool in dj_dev_tools
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_mcp_tools_list(self, mock_mcp_client): async def test_mcp_tools_list(
"""Test MCP tools list contains expected tools and MCP client binding""" self,
mock_mcp_client, # pylint: disable=redefined-outer-name
):
"""Test MCP tools list contains expected tools"""
with patch( with patch(
"agentscope.mcp.HttpStatefulClient", "agentscope.mcp.HttpStatefulClient",
return_value=mock_mcp_client, return_value=mock_mcp_client,
@@ -171,10 +185,10 @@ class TestDataJuicerAgent:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_agent_initialization( async def test_agent_initialization(
self, self,
mock_model, mock_model, # pylint: disable=redefined-outer-name
mock_formatter, mock_formatter, # pylint: disable=redefined-outer-name
mock_toolkit, mock_toolkit, # pylint: disable=redefined-outer-name
mock_memory, mock_memory, # pylint: disable=redefined-outer-name
): ):
"""Test ReActAgent initialization""" """Test ReActAgent initialization"""
with patch.dict(os.environ, {"DASHSCOPE_API_KEY": "test_key"}): with patch.dict(os.environ, {"DASHSCOPE_API_KEY": "test_key"}):
@@ -198,7 +212,11 @@ class TestDataJuicerAgent:
assert isinstance(agent, ReActAgent) assert isinstance(agent, ReActAgent)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_main_with_multiple_agents_loading(self, mock_agent, mock_mcp_client): async def test_main_with_multiple_agents_loading(
self,
mock_agent, # pylint: disable=redefined-outer-name
mock_mcp_client, # pylint: disable=redefined-outer-name
):
"""Test main function loads multiple agents successfully""" """Test main function loads multiple agents successfully"""
with patch.dict(os.environ, {"DASHSCOPE_API_KEY": "test_key"}): with patch.dict(os.environ, {"DASHSCOPE_API_KEY": "test_key"}):
mock_mcp_clients = [mock_mcp_client] mock_mcp_clients = [mock_mcp_client]
@@ -209,21 +227,20 @@ class TestDataJuicerAgent:
): ):
with patch( with patch(
"data_juicer_agent.main.create_agent", "data_juicer_agent.main.create_agent",
side_effect=lambda name, *args, **kwargs: self.create_named_mock_agent( side_effect=self._named_mock_agent_side_effect(mock_agent),
name, mock_agent, *args, **kwargs
),
) as mock_create_agent: ) as mock_create_agent:
with patch( with patch(
"data_juicer_agent.main.user", side_effect=self.mock_user_func "data_juicer_agent.main.user",
side_effect=self.mock_user_func,
): ):
await main( await main(
use_studio=False, use_studio=False,
available_agents=["dj", "dj_dev", "dj_mcp"], available_agents=["dj", "dj_dev", "dj_mcp"],
retrieval_mode="auto", retrieval_mode="auto",
) )
# Validate multiple agents are correctly created (dj, dj_dev, dj_mcp, and router) # Validate multiple agents are correctly created
# (dj, dj_dev, dj_mcp, and router)
assert mock_create_agent.call_count == 4 assert mock_create_agent.call_count == 4