add example of DataJuicer agent
add example of DataJuicer agent
This commit is contained in:
42
README.md
42
README.md
@@ -80,6 +80,7 @@ It includes **agent deployment** and **secure sandboxed tool execution**, and ca
|
||||
├── evaluation/
|
||||
│ └── ace_bench/ # Benchmarks and evaluation tools
|
||||
│
|
||||
├── data_juicer_agent/ # Data processing multi-agent system
|
||||
├── sample_template/ # Template for new sample contributions
|
||||
└── README.md
|
||||
```
|
||||
@@ -88,22 +89,39 @@ It includes **agent deployment** and **secure sandboxed tool execution**, and ca
|
||||
|
||||
## 📌 Example List
|
||||
|
||||
| Category | Example Folder | Uses AgentScope | Use AgentScope Runtime | Description |
|
||||
|-------------------------|-------------------------------------------------------|-----------------|------------------------|---------------------------------------------------------------------------|
|
||||
| **Browser Use** | browser_use/agent_browser | ✅ | ❌ | Command-line browser automation using AgentScope |
|
||||
| | browser_use/browser_use_fullstack_runtime | ✅ | ✅ | Full-stack browser automation with UI & sandbox |
|
||||
| **Deep Research** | deep_research/agent_deep_research | ✅ | ❌ | Multi-agent research pipeline |
|
||||
| | deep_research/qwen_langgraph_search_fullstack_runtime | ❌ | ✅ | Full-stack deep research app |
|
||||
| **Games** | games/game_werewolves | ✅ | ❌ | Multi-agent roleplay game |
|
||||
| **Conversational Apps** | conversational_agents/chatbot_fullstack_runtime | ✅ | ✅ | Chatbot application with frontend/backend |
|
||||
| | conversational_agents/chatbot | ✅ | ❌ | |
|
||||
| | conversational_agents/multiagent_conversation | ✅ | ❌ | Multi-agent dialogue scenario |
|
||||
| | conversational_agents/multiagent_debate | ✅ | ❌ | Agents engaging in debates |
|
||||
| **Evaluation** | evaluation/ace_bench | ✅ | ❌ | Benchmarks with ACE Bench |
|
||||
| Category | Example Folder | Uses AgentScope | Use AgentScope Runtime | Description |
|
||||
| ----------------------- |-------------------------------------------------------| --------------- | ------------ |--------------------------------------------------|
|
||||
| **Data Processing** | data_juicer_agent/ | ✅ | ❌ | Multi-agent data processing with Data-Juicer |
|
||||
| **Browser Use** | browser_use/agent_browser | ✅ | ❌ | Command-line browser automation using AgentScope |
|
||||
| | browser_use/browser_use_fullstack_runtime | ✅ | ✅ | Full-stack browser automation with UI & sandbox |
|
||||
| **Deep Research** | deep_research/agent_deep_research | ✅ | ❌ | Multi-agent research pipeline |
|
||||
| | deep_research/qwen_langgraph_search_fullstack_runtime | ❌ | ✅ | Full-stack deep research app |
|
||||
| **Games** | games/game_werewolves | ✅ | ❌ | Multi-agent roleplay game |
|
||||
| **Conversational Apps** | conversational_agents/chatbot_fullstack_runtime | ✅ | ✅ | Chatbot application with frontend/backend |
|
||||
| | conversational_agents/chatbot | ✅ | ❌ | |
|
||||
| | conversational_agents/multiagent_conversation | ✅ | ❌ | Multi-agent dialogue scenario |
|
||||
| | conversational_agents/multiagent_debate | ✅ | ❌ | Agents engaging in debates |
|
||||
| **Evaluation** | evaluation/ace_bench | ✅ | ❌ | Benchmarks with ACE Bench |
|
||||
| **Alias** | alias/ | ✅ | ✅ | Agent application running in sandbox to solve diverse real-world problems |
|
||||
|
||||
------
|
||||
|
||||
## 🌟 Featured Examples
|
||||
|
||||
### DataJuicer Agent
|
||||
|
||||
A powerful multi-agent data processing system that leverages Data-Juicer's 200+ operators for intelligent data processing:
|
||||
|
||||
- **Intelligent Query**: Find suitable operators from 200+ data processing operators
|
||||
- **Automated Pipeline**: Generate Data-Juicer YAML configurations from natural language
|
||||
- **Custom Development**: Create domain-specific operators with AI assistance
|
||||
- **Multiple Retrieval Modes**: LLM-based and vector-based operator matching
|
||||
- **MCP Integration**: Native Model Context Protocol support
|
||||
|
||||
📖 **Documentation**: [English](data_juicer_agent/README.md) | [中文](data_juicer_agent/README_ZH.md)
|
||||
|
||||
------
|
||||
|
||||
## ℹ️ Getting Help
|
||||
|
||||
If you:
|
||||
|
||||
18
README_zh.md
18
README_zh.md
@@ -80,6 +80,7 @@ AgentScope Runtime 是一个**全面的运行时框架**,主要解决部署和
|
||||
├── evaluation/
|
||||
│ └── ace_bench/ # 基准测试与评估工具
|
||||
│
|
||||
├── data_juicer_agent/ # 数据处理多智能体系统
|
||||
├── sample_template/ # 新样例贡献模板
|
||||
└── README.md
|
||||
```
|
||||
@@ -90,6 +91,7 @@ AgentScope Runtime 是一个**全面的运行时框架**,主要解决部署和
|
||||
|
||||
| 分类 | 示例文件夹 | 使用 AgentScope | 使用 AgentScope Runtime | 描述 |
|
||||
|-----------|-------------------------------------------------------|---------------|-----------------------|-------------------------|
|
||||
| **数据处理** | data_juicer_agent/ | ✅ | ❌ | 基于 Data-Juicer 的多智能体数据处理 |
|
||||
| **浏览器相关** | browser_use/agent_browser | ✅ | ❌ | 基于 AgentScope 的命令行浏览器自动化 |
|
||||
| | browser_use/browser_use_fullstack_runtime | ✅ | ✅ | 带 UI 和沙盒环境的全栈浏览器自动化 |
|
||||
| **深度研究** | deep_research/agent_deep_research | ✅ | ❌ | 多 Agent 研究流程 |
|
||||
@@ -104,6 +106,22 @@ AgentScope Runtime 是一个**全面的运行时框架**,主要解决部署和
|
||||
|
||||
---
|
||||
|
||||
## 🌟 特色示例
|
||||
|
||||
### DataJuicer 智能体
|
||||
|
||||
一个强大的数据处理多智能体系统,利用 Data-Juicer 的 200+ 算子进行智能数据处理:
|
||||
|
||||
- **智能查询**:从 200+ 数据处理算子中找到合适的算子
|
||||
- **自动化流程**:从自然语言描述生成 Data-Juicer YAML 配置
|
||||
- **自定义开发**:通过 AI 辅助创建领域特定的算子
|
||||
- **多种检索模式**:基于 LLM 和向量的算子匹配
|
||||
- **MCP 集成**:原生模型上下文协议支持
|
||||
|
||||
📖 **文档**:[English](data_juicer_agent/README.md) | [中文](data_juicer_agent/README_ZH.md)
|
||||
|
||||
---
|
||||
|
||||
## ℹ️ 获取帮助
|
||||
|
||||
如果你:
|
||||
|
||||
261
data_juicer_agent/README.md
Normal file
261
data_juicer_agent/README.md
Normal file
@@ -0,0 +1,261 @@
|
||||
# DataJuicer Agent
|
||||
|
||||
A multi-agent data processing system built on [AgentScope](https://github.com/modelscope/agentscope) and [Data-Juicer (DJ)](https://github.com/modelscope/data-juicer). This project demonstrates how to leverage the natural language understanding capabilities of large language models, enabling non-expert users to easily harness the powerful data processing capabilities of Data-Juicer.
|
||||
|
||||
## 📋 Table of Contents
|
||||
|
||||
- [📋 Table of Contents](#-table-of-contents)
|
||||
- [What Does This Agent Do?](#what-does-this-agent-do)
|
||||
- [Architecture](#architecture)
|
||||
- [Quick Start](#quick-start)
|
||||
- [System Requirements](#system-requirements)
|
||||
- [Installation](#installation)
|
||||
- [Configuration](#configuration)
|
||||
- [Usage](#usage)
|
||||
- [Agent Introduction](#agent-introduction)
|
||||
- [Data Processing Agent](#data-processing-agent)
|
||||
- [Code Development Agent (DJ Dev Agent)](#code-development-agent-dj-dev-agent)
|
||||
- [Advanced Features](#advanced-features)
|
||||
- [Operator Retrieval](#operator-retrieval)
|
||||
- [Retrieval Modes](#retrieval-modes)
|
||||
- [Usage](#usage-1)
|
||||
- [MCP Agent](#mcp-agent)
|
||||
- [MCP Server Types](#mcp-server-types)
|
||||
- [Configuration](#configuration-1)
|
||||
- [Usage Methods](#usage-methods)
|
||||
- [Feature Preview](#feature-preview)
|
||||
- [Data-Juicer Q\&A Agent (Demo Available)](#data-juicer-qa-agent-demo-available)
|
||||
- [Data Analysis and Visualization Agent (In Development)](#data-analysis-and-visualization-agent-in-development)
|
||||
- [Troubleshooting](#troubleshooting)
|
||||
- [Common Issues](#common-issues)
|
||||
- [Optimization Recommendations](#optimization-recommendations)
|
||||
|
||||
## What Does This Agent Do?
|
||||
|
||||
Data-Juicer (DJ) is a one-stop system for text and multimodal data processing for large language models. It provides nearly 200 core data processing operators, covering multimodal data such as text, images, and videos, and supports the full pipeline of data analysis, cleaning, and synthesis.
|
||||
|
||||
After running this example, you can:
|
||||
- **Intelligent Query**: Find suitable operators from nearly 200 data processing operators for your data scenarios
|
||||
- **Automated Pipeline**: Describe your data processing needs, automatically generate Data-Juicer YAML configurations and execute them
|
||||
- **Custom Extension**: Quickly develop custom operators for specific scenarios
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
User Query
|
||||
↓
|
||||
Router Agent ──┐
|
||||
├── Data Processing Agent (DJ Agent)
|
||||
| ├── General File Read/Write Tools
|
||||
│ ├── query_dj_operators (Query DataJuicer operators)
|
||||
│ └── execute_safe_command (Execute safe commands including dj-process, dj-analyze)
|
||||
│
|
||||
└── Code Development Agent (DJ Dev Agent)
|
||||
├── General File Read/Write Tools
|
||||
├── get_basic_files (Get basic development knowledge)
|
||||
├── get_operator_example (Get operator source code examples related to requirements)
|
||||
└── configure_data_juicer_path (Configure DataJuicer path)
|
||||
```
|
||||
|
||||
## Quick Start
|
||||
|
||||
### System Requirements
|
||||
|
||||
- Python 3.10+
|
||||
- Valid DashScope API key
|
||||
- Optional: Data-Juicer source code (for custom operator development)
|
||||
|
||||
### Installation
|
||||
|
||||
```bash
|
||||
# Recommended to use uv
|
||||
uv pip install -r requirements.txt
|
||||
```
|
||||
|
||||
or
|
||||
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
### Configuration
|
||||
|
||||
1. **Set API Key**
|
||||
|
||||
```bash
|
||||
export DASHSCOPE_API_KEY="your-dashscope-key"
|
||||
```
|
||||
|
||||
2. **Optional: Configure Data-Juicer Path (for custom operator development)**
|
||||
|
||||
```bash
|
||||
export DATA_JUICER_PATH="your-data-juicer-path"
|
||||
```
|
||||
|
||||
> **Tip**: You can also set this during runtime through conversation, for example:
|
||||
> - "Help me set the DataJuicer path: /path/to/data-juicer"
|
||||
> - "Help me update the DataJuicer path: /path/to/data-juicer"
|
||||
|
||||
### Usage
|
||||
|
||||
Choose the running mode using the `-u` or `--use_studio` parameter:
|
||||
|
||||
```bash
|
||||
# Use AgentScope Studio (provides interactive interface)
|
||||
python main.py --use_studio True
|
||||
|
||||
# Or use command-line mode (default)
|
||||
python main.py
|
||||
```
|
||||
|
||||
## Agent Introduction
|
||||
|
||||
### Data Processing Agent
|
||||
|
||||
Responsible for interacting with Data-Juicer and executing actual data processing tasks. Supports automatic operator recommendation from natural language descriptions, configuration generation, and execution.
|
||||
|
||||
**Typical Use Cases:**
|
||||
- **Data Cleaning**: Deduplication, removal of low-quality samples, format standardization
|
||||
- **Multimodal Processing**: Process text, image, and video data simultaneously
|
||||
- **Batch Conversion**: Format conversion, data augmentation, feature extraction
|
||||
|
||||
<details>
|
||||
<summary>View Complete Example Log (from AgentScope Studio)</summary>
|
||||
<img src="assets/dj_agent_image.png" width="100%">
|
||||
</details>
|
||||
|
||||
### Code Development Agent (DJ Dev Agent)
|
||||
|
||||
Assists in developing custom data processing operators, powered by the `qwen3-coder-480b-a35b-instruct` model by default.
|
||||
|
||||
**Typical Use Cases:**
|
||||
- **Develop domain-specific filter or transformation operators**
|
||||
- **Integrate proprietary data processing logic**
|
||||
- **Extend Data-Juicer capabilities for specific scenarios**
|
||||
|
||||
<details>
|
||||
<summary>View Complete Example Log (from AgentScope Studio)</summary>
|
||||
<img src="assets/dj_dev_agent_image.png" width="100%">
|
||||
</details>
|
||||
|
||||
## Advanced Features
|
||||
|
||||
### Operator Retrieval
|
||||
|
||||
DJ Agent implements an intelligent operator retrieval tool that quickly finds the most relevant operators from Data-Juicer's nearly 200 operators through an independent LLM query process. This is a key component enabling the data processing agent and code development agent to run accurately.
|
||||
|
||||
We provide three retrieval modes to choose from based on different scenarios:
|
||||
|
||||
#### Retrieval Modes
|
||||
|
||||
**LLM Retrieval (default)**
|
||||
- Uses the Qwen-Turbo model to match the most relevant operators
|
||||
- Provides detailed matching reasons and relevance scores
|
||||
- Suitable for scenarios requiring high-precision matching, but consumes more tokens
|
||||
|
||||
**Vector Retrieval (vector)**
|
||||
- Based on DashScope text embedding and FAISS similarity search
|
||||
- Fast and efficient, suitable for large-scale retrieval scenarios
|
||||
|
||||
**Auto Mode (auto)**
|
||||
- Prioritizes LLM retrieval, automatically falls back to vector retrieval on failure
|
||||
|
||||
#### Usage
|
||||
|
||||
Specify the retrieval mode using the `-r` or `--retrieve_mode` parameter:
|
||||
|
||||
```bash
|
||||
python main.py --retrieve_mode vector
|
||||
```
|
||||
|
||||
For more parameter descriptions, see `python main.py --help`
|
||||
|
||||
### MCP Agent
|
||||
|
||||
Data-Juicer provides MCP (Model Context Protocol) services that can directly obtain operator information and execute data processing through native interfaces, making it easy to migrate and integrate without separate LLM queries and command-line calls.
|
||||
|
||||
#### MCP Server Types
|
||||
|
||||
Data-Juicer provides two MCP server modes:
|
||||
|
||||
**Recipe-Flow (Data Recipe)**
|
||||
- Filter by operator type and tags
|
||||
- Support combining multiple operators into data recipes for execution
|
||||
|
||||
**Granular-Operators (Fine-grained Operators)**
|
||||
- Provide each operator as an independent tool
|
||||
- Flexibly specify operator lists through environment variables
|
||||
- Build fully customized data processing pipelines
|
||||
|
||||
For detailed information, please refer to: [Data-Juicer MCP Service Documentation](https://modelscope.github.io/data-juicer/en/main/docs/DJ_service.html#mcp-server)
|
||||
|
||||
> **Note**: The Data-Juicer MCP server is currently in early development, and features and tools may change with ongoing development.
|
||||
|
||||
#### Configuration
|
||||
|
||||
Configure the service address in `configs/mcp_config.json`:
|
||||
|
||||
```json
|
||||
{
|
||||
"mcpServers": {
|
||||
"DJ_recipe_flow": {
|
||||
"url": "http://127.0.0.1:8080/sse"
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### Usage Methods
|
||||
|
||||
Enable MCP Agent to replace DJ Agent:
|
||||
|
||||
```bash
|
||||
# Enable MCP Agent and Dev Agent
|
||||
python main.py --available_agents [dj_mcp, dj_dev]
|
||||
|
||||
# Or use shorthand
|
||||
python main.py -a [dj_mcp, dj_dev]
|
||||
```
|
||||
|
||||
## Feature Preview
|
||||
|
||||
The Data-Juicer agent ecosystem is rapidly expanding. Here are the new agents currently in development or planned:
|
||||
|
||||
### Data-Juicer Q&A Agent (Demo Available)
|
||||
|
||||
Provides users with detailed answers about Data-Juicer operators, concepts, and best practices.
|
||||
|
||||
<video controls width="100%" height="auto" playsinline>
|
||||
<source src="https://github.com/user-attachments/assets/a8392691-81cf-4a25-94da-967dcf92c685" type="video/mp4">
|
||||
Your browser does not support the video tag.
|
||||
</video>
|
||||
|
||||
### Data Analysis and Visualization Agent (In Development)
|
||||
|
||||
Generates data analysis and visualization results, expected to be released soon.
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### Common Issues
|
||||
|
||||
**Q: How to get DashScope API key?**
|
||||
A: Visit [DashScope official website](https://dashscope.aliyun.com/) to register an account and apply for an API key.
|
||||
|
||||
**Q: Why does operator retrieval fail?**
|
||||
A: Please check network connection and API key configuration, or try switching to vector retrieval mode.
|
||||
|
||||
**Q: How to debug custom operators?**
|
||||
A: Ensure Data-Juicer path is configured correctly and check the example code provided by the code development agent.
|
||||
|
||||
**Q: What to do if MCP service connection fails?**
|
||||
A: Check if the MCP server is running and confirm the URL address in the configuration file is correct.
|
||||
|
||||
### Optimization Recommendations
|
||||
|
||||
- For large-scale data processing, it is recommended to use DataJuicer's distributed mode
|
||||
- Set batch size appropriately to balance memory usage and processing speed
|
||||
- For more advanced data processing features (synthesis, Data-Model Co-Development), please refer to DataJuicer [documentation](https://modelscope.github.io/data-juicer/en/main/index.html)
|
||||
|
||||
---
|
||||
|
||||
**Contributing**: Welcome to submit Issues and Pull Requests to improve AgentScope, DataJuicer Agent, and [DataJuicer](https://modelscope.github.io/data-juicer/en/main/index.html#contribution-and-acknowledgements). If you encounter problems during use or have feature suggestions, please feel free to contact us.
|
||||
260
data_juicer_agent/README_ZH.md
Normal file
260
data_juicer_agent/README_ZH.md
Normal file
@@ -0,0 +1,260 @@
|
||||
# DataJuicer 智能体
|
||||
|
||||
基于 [AgentScope](https://github.com/modelscope/agentscope) 和 [Data-Juicer (DJ)](https://github.com/modelscope/data-juicer) 构建的数据处理多智能体系统。该项目展示了如何利用大模型的自然语言理解能力,让非专家用户也能轻松使用 Data-Juicer 的强大数据处理能力。
|
||||
|
||||
## 📋 目录
|
||||
|
||||
- [📋 目录](#-目录)
|
||||
- [这个智能体做了什么?](#这个智能体做了什么)
|
||||
- [架构](#架构)
|
||||
- [快速开始](#快速开始)
|
||||
- [系统要求](#系统要求)
|
||||
- [安装](#安装)
|
||||
- [配置](#配置)
|
||||
- [使用](#使用)
|
||||
- [智能体介绍](#智能体介绍)
|
||||
- [数据处理智能体](#数据处理智能体)
|
||||
- [代码开发智能体](#代码开发智能体)
|
||||
- [高级功能](#高级功能)
|
||||
- [算子检索](#算子检索)
|
||||
- [检索模式](#检索模式)
|
||||
- [使用](#使用-1)
|
||||
- [MCP 智能体](#mcp-智能体)
|
||||
- [MCP 服务器类型](#mcp-服务器类型)
|
||||
- [配置](#配置-1)
|
||||
- [使用方法](#使用方法)
|
||||
- [功能预览](#功能预览)
|
||||
- [Data-Juicer 问答智能体 (演示可用)](#data-juicer-问答智能体-演示可用)
|
||||
- [数据分析与可视化智能体 (开发中)](#数据分析与可视化智能体-开发中)
|
||||
- [常见问题](#常见问题)
|
||||
- [优化建议](#优化建议)
|
||||
|
||||
## 这个智能体做了什么?
|
||||
|
||||
Data-Juicer (DJ) 是一个一站式系统,面向大模型的文本及多模态数据处理。它提供了近200个核心数据处理算子,覆盖文本、图像、视频等多模态数据,支持数据分析、清洗、合成等全流程。
|
||||
|
||||
运行本示例后,您可以:
|
||||
- **智能查询**:从近200个数据处理算子中找到适合您数据场景的算子
|
||||
- **自动化流程**:描述数据处理需求,自动生成 Data-Juicer YAML 配置并执行
|
||||
- **自定义扩展**:为特定场景快速开发自定义算子
|
||||
|
||||
## 架构
|
||||
|
||||
```
|
||||
用户查询
|
||||
↓
|
||||
路由智能体 ──┐
|
||||
├── 数据处理智能体 (DJ 智能体)
|
||||
| ├── 通用文件读写工具
|
||||
│ ├── query_dj_operators (查询DataJuicer算子)
|
||||
│ └── execute_safe_command (执行包含dj-process, dj-analyze在内的安全命令)
|
||||
│
|
||||
└── 代码开发智能体 (DJ Dev 智能体)
|
||||
├── 通用文件读写工具
|
||||
├── get_basic_files (获取基础的开发知识)
|
||||
├── get_operator_example (获取与需求相关的算子源码示例)
|
||||
└── configure_data_juicer_path (配置DataJuicer路径)
|
||||
```
|
||||
|
||||
## 快速开始
|
||||
|
||||
### 系统要求
|
||||
|
||||
- Python 3.10+
|
||||
- 有效的 DashScope API 密钥
|
||||
- 可选:Data-Juicer 源码(用于自定义算子开发)
|
||||
|
||||
### 安装
|
||||
|
||||
```bash
|
||||
# 推荐使用uv
|
||||
uv pip install -r requirements.txt
|
||||
```
|
||||
|
||||
或
|
||||
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
### 配置
|
||||
|
||||
1. **设置 API 密钥**
|
||||
|
||||
```bash
|
||||
export DASHSCOPE_API_KEY="your-dashscope-key"
|
||||
```
|
||||
|
||||
2. **可选:配置 Data-Juicer 路径(用于自定义算子开发)**
|
||||
|
||||
```bash
|
||||
export DATA_JUICER_PATH="your-data-juicer-path"
|
||||
```
|
||||
|
||||
> **提示**:也可以在运行时通过对话设置,例如:
|
||||
> - "帮我设置 DataJuicer 路径:/path/to/data-juicer"
|
||||
> - "帮我更新 DataJuicer 路径:/path/to/data-juicer"
|
||||
|
||||
### 使用
|
||||
|
||||
通过 `-u` 或 `--use_studio` 参数选择运行方式:
|
||||
|
||||
```bash
|
||||
# 使用 AgentScope Studio(提供交互式界面)
|
||||
python main.py --use_studio True
|
||||
|
||||
# 或使用命令行模式(默认)
|
||||
python main.py
|
||||
```
|
||||
|
||||
## 智能体介绍
|
||||
|
||||
### 数据处理智能体
|
||||
|
||||
负责与 Data-Juicer 交互,执行实际的数据处理任务。支持从自然语言描述自动推荐算子、生成配置并执行。
|
||||
|
||||
**典型用途:**
|
||||
- **数据清洗**:去重、移除低质量样本、格式标准化
|
||||
- **多模态处理**:同时处理文本、图像、视频数据
|
||||
- **批量转换**:格式转换、数据增强、特征提取
|
||||
|
||||
<details>
|
||||
<summary>查看完整示例日志(from AgentScope Studio)</summary>
|
||||
<img src="assets/dj_agent_image.png" width="100%">
|
||||
</details>
|
||||
|
||||
### 代码开发智能体
|
||||
|
||||
辅助开发自定义数据处理算子,默认使用 `qwen3-coder-480b-a35b-instruct` 模型驱动。
|
||||
|
||||
**典型用途:**
|
||||
- **开发领域特定的过滤或转换算子**
|
||||
- **集成自有的数据处理逻辑**
|
||||
- **为特定场景扩展 Data-Juicer 能力**
|
||||
|
||||
<details>
|
||||
<summary>查看完整示例日志(from AgentScope Studio)</summary>
|
||||
<img src="assets/dj_dev_agent_image.png" width="100%">
|
||||
</details>
|
||||
|
||||
## 高级功能
|
||||
|
||||
### 算子检索
|
||||
|
||||
DJ 智能体实现了一个智能算子检索工具,通过独立的 LLM 查询环节从 Data-Juicer 的近200个算子中快速找到最相关的算子。这是数据处理智能体和代码开发智能体能够准确运行的关键组件。
|
||||
|
||||
我们提供了三种检索模式,可根据不同场景选用:
|
||||
|
||||
#### 检索模式
|
||||
|
||||
**LLM 检索 (默认)**
|
||||
- 使用 Qwen-Turbo 模型匹配最相关算子
|
||||
- 提供详细的匹配理由和相关性评分
|
||||
- 适合需要高精度匹配的场景,但消耗更多 Token
|
||||
|
||||
**向量检索 (vector)**
|
||||
- 基于 DashScope 文本嵌入和 FAISS 相似度搜索
|
||||
- 快速且高效,适合大规模检索场景
|
||||
|
||||
**自动模式 (auto)**
|
||||
- 优先尝试 LLM 检索,失败时自动降级到向量检索
|
||||
|
||||
#### 使用
|
||||
|
||||
通过 `-r` 或 `--retrieve_mode` 参数指定检索模式:
|
||||
|
||||
```bash
|
||||
python main.py --retrieve_mode vector
|
||||
```
|
||||
|
||||
更多参数说明见 `python main.py --help`
|
||||
|
||||
### MCP 智能体
|
||||
|
||||
Data-Juicer 提供了 MCP (Model Context Protocol) 服务,可直接通过原生接口获取算子信息、执行数据处理,易于迁移和集成,无需单独的 LLM 查询和命令行调用。
|
||||
|
||||
#### MCP 服务器类型
|
||||
|
||||
Data-Juicer 提供两种 MCP 服务器模式:
|
||||
|
||||
**Recipe-Flow(数据菜谱)**
|
||||
- 根据算子类型和标签进行筛选
|
||||
- 支持将多个算子组合成数据菜谱运行
|
||||
|
||||
**Granular-Operators(细粒度算子)**
|
||||
- 将每个算子作为独立工具提供
|
||||
- 通过环境变量灵活指定算子列表
|
||||
- 构建完全定制化的数据处理管道
|
||||
|
||||
详细信息请参考:[Data-Juicer MCP 服务文档](https://modelscope.github.io/data-juicer/en/main/docs/DJ_service.html#mcp-server)
|
||||
|
||||
> **注意**:Data-Juicer MCP 服务器目前处于早期开发阶段,功能和工具可能会随着持续开发而变化。
|
||||
|
||||
#### 配置
|
||||
|
||||
在 `configs/mcp_config.json` 中配置服务地址:
|
||||
|
||||
```json
|
||||
{
|
||||
"mcpServers": {
|
||||
"DJ_recipe_flow": {
|
||||
"url": "http://127.0.0.1:8080/sse"
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### 使用方法
|
||||
|
||||
启用 MCP 智能体替代 DJ 智能体:
|
||||
|
||||
```bash
|
||||
# 启用 MCP 智能体和开发智能体
|
||||
python main.py --available_agents [dj_mcp, dj_dev]
|
||||
|
||||
# 或使用简写
|
||||
python main.py -a [dj_mcp, dj_dev]
|
||||
```
|
||||
|
||||
|
||||
## 功能预览
|
||||
|
||||
Data-Juicer 智能体生态系统正在快速扩展,以下是当前正在开发或计划中的新智能体:
|
||||
|
||||
### Data-Juicer 问答智能体 (演示可用)
|
||||
|
||||
为用户提供关于 Data-Juicer 算子、概念和最佳实践的详细解答。
|
||||
|
||||
<video controls width="100%" height="auto" playsinline>
|
||||
<source src="https://github.com/user-attachments/assets/a8392691-81cf-4a25-94da-967dcf92c685" type="video/mp4">
|
||||
您的浏览器不支持视频标签。
|
||||
</video>
|
||||
|
||||
### 数据分析与可视化智能体 (开发中)
|
||||
|
||||
生成数据分析和可视化结果,预计近期发布。
|
||||
|
||||
### 常见问题
|
||||
|
||||
**Q: 如何获取 DashScope API 密钥?**
|
||||
A: 访问 [DashScope 官网](https://dashscope.aliyun.com/) 注册账号并申请 API 密钥。
|
||||
|
||||
**Q: 为什么算子检索失败?**
|
||||
A: 请检查网络连接和 API 密钥配置,或尝试切换到向量检索模式。
|
||||
|
||||
**Q: 如何调试自定义算子?**
|
||||
A: 确保 Data-Juicer 路径配置正确,并查看代码开发智能体提供的示例代码。
|
||||
|
||||
**Q: MCP 服务连接失败怎么办?**
|
||||
A: 检查 MCP 服务器是否正在运行,确认配置文件中的 URL 地址正确。
|
||||
|
||||
### 优化建议
|
||||
|
||||
- 对于大规模数据处理,建议使用DataJuicer提供的分布式模式
|
||||
- 合理设置批处理大小以平衡内存使用和处理速度
|
||||
- 更多进阶数据处理(合成、Data-Model Co-Development)等特性能力请参考DataJuicer[文档页](https://modelscope.github.io/data-juicer/zh_CN/main/index_ZH)
|
||||
|
||||
|
||||
---
|
||||
|
||||
**贡献指南**:欢迎提交 Issue 和 Pull Request 来改进agentscope、DataJuicer Agent及[DataJuicer](https://modelscope.github.io/data-juicer/zh_CN/main/index_ZH#id4)。如果您在使用过程中遇到问题或有功能建议,请随时联系我们。
|
||||
0
data_juicer_agent/__init__.py
Normal file
0
data_juicer_agent/__init__.py
Normal file
92
data_juicer_agent/agent_factory.py
Normal file
92
data_juicer_agent/agent_factory.py
Normal file
@@ -0,0 +1,92 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Agent Factory
|
||||
|
||||
Factory functions for creating and configuring agents with standardized toolkits.
|
||||
"""
|
||||
|
||||
import os
|
||||
from typing import Optional
|
||||
from agentscope.agent import ReActAgent
|
||||
from agentscope.tool import Toolkit
|
||||
from agentscope.formatter import FormatterBase, OpenAIChatFormatter
|
||||
from agentscope.model import ChatModelBase, OpenAIChatModel
|
||||
from agentscope.memory import InMemoryMemory, MemoryBase
|
||||
|
||||
|
||||
# Default configurations
|
||||
DEFAULT_MODEL_CONFIG = {
|
||||
"model_name": "gpt-4o",
|
||||
"stream": False,
|
||||
}
|
||||
|
||||
|
||||
def get_default_model() -> OpenAIChatModel:
|
||||
"""Create default OpenAI model instance."""
|
||||
api_key = os.environ.get("OPENAI_API_KEY")
|
||||
if not api_key:
|
||||
raise ValueError("OPENAI_API_KEY environment variable is required")
|
||||
|
||||
return OpenAIChatModel(api_key=api_key, **DEFAULT_MODEL_CONFIG)
|
||||
|
||||
|
||||
def create_agent(
|
||||
name: str,
|
||||
sys_prompt: str,
|
||||
toolkit: Toolkit,
|
||||
description: Optional[str] = None,
|
||||
model: Optional[ChatModelBase] = None,
|
||||
formatter: Optional[FormatterBase] = None,
|
||||
memory: Optional[MemoryBase] = None,
|
||||
max_iters: int = 10,
|
||||
parallel_tool_calls: bool = False,
|
||||
**kwargs,
|
||||
) -> ReActAgent:
|
||||
"""
|
||||
Create a ReActAgent with standardized configuration.
|
||||
|
||||
Args:
|
||||
name: Agent identifier
|
||||
sys_prompt: System prompt template (supports {name} placeholder)
|
||||
toolkit: Toolkit instance
|
||||
model: Language model (defaults to GPT-4o)
|
||||
formatter: Message formatter (defaults to OpenAIChatFormatter)
|
||||
memory: Memory instance (defaults to InMemoryMemory)
|
||||
max_iters: Maximum reasoning iterations
|
||||
parallel_tool_calls: Enable parallel tool execution
|
||||
**kwargs: Additional ReActAgent arguments
|
||||
|
||||
Returns:
|
||||
Configured ReActAgent instance
|
||||
|
||||
Example:
|
||||
>>> agent = create_agent(
|
||||
... name="sql_expert",
|
||||
... sys_prompt="You are {name}, a SQL database expert",
|
||||
... tools=sql_tools
|
||||
... )
|
||||
"""
|
||||
# Set defaults
|
||||
if model is None:
|
||||
model = get_default_model()
|
||||
if formatter is None:
|
||||
formatter = OpenAIChatFormatter()
|
||||
if memory is None:
|
||||
memory = InMemoryMemory()
|
||||
|
||||
# Create agent
|
||||
agent = ReActAgent(
|
||||
name=name,
|
||||
sys_prompt=sys_prompt.format(name=name),
|
||||
model=model,
|
||||
formatter=formatter,
|
||||
toolkit=toolkit,
|
||||
memory=memory,
|
||||
max_iters=max_iters,
|
||||
parallel_tool_calls=parallel_tool_calls,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
agent.__doc__ = description
|
||||
|
||||
return agent
|
||||
BIN
data_juicer_agent/assets/dj_agent_image.png
Normal file
BIN
data_juicer_agent/assets/dj_agent_image.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 3.7 MiB |
BIN
data_juicer_agent/assets/dj_dev_agent_image.png
Normal file
BIN
data_juicer_agent/assets/dj_dev_agent_image.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 10 MiB |
7
data_juicer_agent/configs/mcp_config.json
Normal file
7
data_juicer_agent/configs/mcp_config.json
Normal file
@@ -0,0 +1,7 @@
|
||||
{
|
||||
"mcpServers": {
|
||||
"DJ_recipe_flow": {
|
||||
"url": "http://127.0.0.1:8080/sse"
|
||||
}
|
||||
}
|
||||
}
|
||||
3
data_juicer_agent/data/demo-dataset-images.jsonl
Normal file
3
data_juicer_agent/data/demo-dataset-images.jsonl
Normal file
@@ -0,0 +1,3 @@
|
||||
{"images":["./images/img1.png"], "text": "<__dj__image> A comfortable bed."}
|
||||
{"images":["./images/img2.jpg"], "text": "<__dj__image> A bus."}
|
||||
{"images":["./images/img3.jpg"], "text": "<__dj__image> Black and white photograph of a woman holding an umbrella."}
|
||||
BIN
data_juicer_agent/data/images/img1.png
Normal file
BIN
data_juicer_agent/data/images/img1.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 167 KiB |
BIN
data_juicer_agent/data/images/img2.jpg
Normal file
BIN
data_juicer_agent/data/images/img2.jpg
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 185 KiB |
BIN
data_juicer_agent/data/images/img3.jpg
Normal file
BIN
data_juicer_agent/data/images/img3.jpg
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 111 KiB |
163
data_juicer_agent/main.py
Normal file
163
data_juicer_agent/main.py
Normal file
@@ -0,0 +1,163 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import os
|
||||
import fire
|
||||
from typing import List
|
||||
|
||||
from agentscope.model import DashScopeChatModel
|
||||
from agentscope.formatter import DashScopeChatFormatter
|
||||
from agentscope.memory import InMemoryMemory
|
||||
from agentscope.agent import UserAgent
|
||||
|
||||
from agent_factory import create_agent
|
||||
from prompts import DJ_SYS_PROMPT, DJ_DEV_SYS_PROMPT, ROUTER_SYS_PROMPT, MCP_SYS_PROMPT
|
||||
from tools import (
|
||||
dj_toolkit,
|
||||
dj_dev_toolkit,
|
||||
mcp_tools,
|
||||
get_mcp_toolkit,
|
||||
agents2toolkit,
|
||||
)
|
||||
|
||||
# Create shared configuration
|
||||
model = DashScopeChatModel(
|
||||
model_name="qwen-max",
|
||||
api_key=os.environ["DASHSCOPE_API_KEY"],
|
||||
stream=True,
|
||||
enable_thinking=False,
|
||||
)
|
||||
|
||||
dev_model = DashScopeChatModel(
|
||||
model_name="qwen3-coder-480b-a35b-instruct",
|
||||
api_key=os.environ["DASHSCOPE_API_KEY"],
|
||||
stream=True,
|
||||
enable_thinking=False,
|
||||
)
|
||||
|
||||
formatter = DashScopeChatFormatter()
|
||||
memory = InMemoryMemory()
|
||||
|
||||
user = UserAgent("User")
|
||||
|
||||
|
||||
async def main(
|
||||
use_studio: bool = False,
|
||||
available_agents: List[str] = ["dj", "dj_dev"],
|
||||
retrieval_mode: str = "auto",
|
||||
):
|
||||
"""
|
||||
Main function for running the agent.
|
||||
|
||||
:param use_studio: Whether to use agentscope studio.
|
||||
:param available_agents: List of available agents. Options: dj, dj_dev, dj_mcp
|
||||
:param retrieval_mode: Retrieval mode for operators. Options: auto, vector, llm
|
||||
"""
|
||||
|
||||
if "dj" in available_agents:
|
||||
# Set global retrieval mode for tools to use
|
||||
os.environ["RETRIEVAL_MODE"] = retrieval_mode
|
||||
print(f"Using retrieval mode: {retrieval_mode}")
|
||||
|
||||
agents = []
|
||||
for agent_name in available_agents:
|
||||
if agent_name == "dj":
|
||||
# Create agents using unified create_agent function
|
||||
dj_agent = create_agent(
|
||||
"datajuicer_agent",
|
||||
DJ_SYS_PROMPT,
|
||||
dj_toolkit,
|
||||
(
|
||||
"A professional data preprocessing AI assistant with the following core capabilities: \n"
|
||||
"Tool Matching \n"
|
||||
"- Query and validate suitable DataJuicer operators; \n"
|
||||
"Configuration Generation \n"
|
||||
"- Create YAML configuration files and preview data; \n"
|
||||
"Task Execution - Run data processing pipelines and output results"
|
||||
),
|
||||
model,
|
||||
formatter,
|
||||
memory,
|
||||
)
|
||||
agents.append(dj_agent)
|
||||
|
||||
if agent_name == "dj_dev":
|
||||
# DJ Development Agent for operator development
|
||||
dj_dev_agent = create_agent(
|
||||
"dj_dev_agent",
|
||||
DJ_DEV_SYS_PROMPT,
|
||||
dj_dev_toolkit,
|
||||
(
|
||||
"An expert DataJuicer development assistant specializing in creating new DataJuicer operators. \n"
|
||||
"Core capabilities: \n"
|
||||
"Reference Retrieval - fetch base classes and examples; \n"
|
||||
"Environment Configuration - handle DATA_JUICER_PATH setup. if user provides a DataJuicer path requiring setup/update, please call this agent;\n; "
|
||||
"Code Generation - write complete, convention-compliant operator code"
|
||||
),
|
||||
dev_model,
|
||||
formatter,
|
||||
memory,
|
||||
)
|
||||
agents.append(dj_dev_agent)
|
||||
|
||||
if agent_name == "dj_mcp":
|
||||
mcp_toolkit, _ = await get_mcp_toolkit()
|
||||
for tool in mcp_tools:
|
||||
mcp_toolkit.register_tool_function(tool)
|
||||
|
||||
mcp_agent = create_agent(
|
||||
"mcp_datajuicer_agent",
|
||||
MCP_SYS_PROMPT,
|
||||
mcp_toolkit,
|
||||
(
|
||||
"DataJuicer MCP Agent powered by Recipe Flow MCP server. \n"
|
||||
"Core capabilities: \n"
|
||||
"- Filter operators by tags/categories using MCP protocol; \n"
|
||||
"- Real-time data processing pipeline execution. \n"
|
||||
),
|
||||
model,
|
||||
formatter,
|
||||
memory,
|
||||
)
|
||||
agents.append(mcp_agent)
|
||||
|
||||
# Router agent - uses agents2tools to dynamically generate tools from all agents
|
||||
router_agent = create_agent(
|
||||
"Router",
|
||||
ROUTER_SYS_PROMPT,
|
||||
agents2toolkit(agents),
|
||||
"A router agent that intelligently routes tasks to specialized DataJuicer agents",
|
||||
model,
|
||||
formatter,
|
||||
InMemoryMemory(), # Router uses its own memory instance
|
||||
)
|
||||
|
||||
if use_studio is True:
|
||||
import agentscope
|
||||
|
||||
agentscope.init(
|
||||
studio_url="http://localhost:3000",
|
||||
project="data_agent",
|
||||
)
|
||||
|
||||
msg = None
|
||||
while True:
|
||||
msg = await user(msg)
|
||||
if msg.get_text_content() == "exit":
|
||||
break
|
||||
# Router agent handles the entire task with automatic multi-step routing
|
||||
msg = await router_agent(msg)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Example tasks
|
||||
# project_root = os.path.abspath(os.path.dirname(__file__))
|
||||
# task = (
|
||||
# f"The data is stored in {project_root}/data/demo-dataset-images.jsonl. "
|
||||
# "Among the samples, the text field length is less than 5 "
|
||||
# "and the image size is less than 100Kb. "
|
||||
# "And save the output results to the ./outputs path."
|
||||
# )
|
||||
#
|
||||
# DJ Development example task:
|
||||
# task = "I want to develop a new DataJuicer filter operator to filter out audio files without vocals"
|
||||
#
|
||||
fire.Fire(main)
|
||||
135
data_juicer_agent/prompts.py
Normal file
135
data_juicer_agent/prompts.py
Normal file
@@ -0,0 +1,135 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
DJ_SYS_PROMPT = """
|
||||
You are an expert data preprocessing assistant named {name}, specializing in handling multimodal data including text, images, videos, and other AI model-related data.
|
||||
|
||||
You will strictly follow these steps sequentially:
|
||||
|
||||
- Data Preview (optional but recommended):
|
||||
Before generating the YAML, you may first use `view_text_file` to inspect a small subset of the raw data (e.g., the first 5–10 samples) so that you can:
|
||||
1. Verify the exact field names and formats;
|
||||
2. Decide appropriate values such as `text_keys`, `image_key`, and the parameters of subsequent operators.
|
||||
If the user requests or needs more specific data analysis, use `dj-analyzer` to analyze the data:
|
||||
1. After creating the configuration file according to the requirements, run it (see Step 2 for the configuration file creation method):
|
||||
dj-analyze --config configs/your_analyzer.yaml
|
||||
2. you can also use auto mode to avoid writing a recipe. It will analyze a small part (e.g. 1000 samples, specified by argument `auto_num`) of your dataset with all Filters that produce stats.
|
||||
dj-analyze --auto --dataset_path xx.jsonl [--auto_num 1000]
|
||||
|
||||
Step 1: Tool Discovery and Matching
|
||||
- First, use the `query_dj_operators` tool to get relevant DataJuicer operators based on the user's task description
|
||||
- Analyze the retrieved operators and verify if they have exact functional matches with the input query
|
||||
- If no suitable operators are found, immediately terminate the task
|
||||
- If partially supported operators exist, skip incompatible parts and proceed
|
||||
|
||||
Step 2: Generate Configuration File
|
||||
- Create a YAML configuration containing global parameters and tool configurations. Save it to a YAML file with yaml dump api.
|
||||
After successful file creation, inform the user of the file location. File save failure indicates task failure.
|
||||
a. Global Parameters:
|
||||
- project_name: Project name
|
||||
- dataset_path: Real data path (never fabricate paths. Set to `None` if unknown)
|
||||
- export_path: Output path (use default if unspecified)
|
||||
- text_keys: Text field names to process
|
||||
- image_key: Image field name to process
|
||||
- np: Multiprocessing count
|
||||
Keep other parameters as defaults.
|
||||
|
||||
b. Operator Configuration:
|
||||
- Use the operators retrieved from Step 1 to configure the 'process' field
|
||||
- Ensure precise functional matching with user requirements
|
||||
|
||||
Step 3: Execute Processing Task
|
||||
Pre-execution checks:
|
||||
- dataset_path: Must be a valid user-provided path and the path must exist
|
||||
- process: Operator configuration list must exist
|
||||
Terminate immediately if any check fails and explain why.
|
||||
|
||||
If all pre-execution checks are valid, run: `dj-process --config ${{YAML_config_file}}`
|
||||
|
||||
Mandatory Requirements:
|
||||
- Never ask me questions. Make reasonable assumptions for non-critical parameters
|
||||
- Only generate the reply after the task has finished running
|
||||
- Always start by retrieving relevant operators using the query_dj_operators tool
|
||||
|
||||
Configuration Template:
|
||||
```yaml
|
||||
# global parameters
|
||||
project_name: {{your project name}}
|
||||
dataset_path: {{path to your dataset directory or file}}
|
||||
text_keys: {{text key to be processed}}
|
||||
image_key: {{image key to be processed}}
|
||||
np: {{number of subprocess to process your dataset}}
|
||||
skip_op_error: false # must set to false
|
||||
|
||||
export_path: {{single file path to save processed data, must be a jsonl file path not a folder}}
|
||||
|
||||
# process schedule
|
||||
# a list of several process operators with their arguments
|
||||
process:
|
||||
- image_shape_filter:
|
||||
min_width: 100
|
||||
min_height: 100
|
||||
- text_length_filter:
|
||||
min_len: 5
|
||||
max_len: 10000
|
||||
- ...
|
||||
```
|
||||
|
||||
Available Tools:
|
||||
Function definitions:
|
||||
```
|
||||
{{index}}. {{function name}}: {{function description}}
|
||||
{{argument1 name}} ({{argument type}}): {{argument description}}
|
||||
{{argument2 name}} ({{argument type}}): {{argument description}}
|
||||
```
|
||||
|
||||
"""
|
||||
|
||||
DJ_DEV_SYS_PROMPT = """
|
||||
You are an expert DataJuicer operator development assistant named {name}, specializing in helping developers create new DataJuicer operators.
|
||||
|
||||
Development Workflow:
|
||||
1. Understand user requirements and identify operator type (filter, mapper, deduplicator, etc.)
|
||||
2. Call `get_basic_files()` to get base_op classes and development guidelines
|
||||
3. Call `get_operator_example(operator_type)` to get relevant examples
|
||||
4. If previous tools report `DATA_JUICER_PATH` not configured, **STOP** and request user input with a clear message asking for the value of `DATA_JUICER_PATH`
|
||||
5. Once the user provides `DATA_JUICER_PATH`, call `configure_data_juicer_path(data_juicer_path)` with the provided value
|
||||
**Do not attempt to set or infer `DATA_JUICER_PATH` on your own**
|
||||
|
||||
Critical Requirements:
|
||||
- NEVER guess or fabricate file paths or configuration values
|
||||
- Always call get_basic_files() and get_operator_example() before writing code
|
||||
- Write complete, runnable code following DataJuicer conventions
|
||||
- Focus on practical implementation
|
||||
"""
|
||||
|
||||
MCP_SYS_PROMPT = """You are {name}, an advanced DataJuicer MCP Agent powered by MCP server, specializing in handling multimodal data including text, images, videos, and other AI model-related data.
|
||||
|
||||
Analyze user requirements and use the tools provided to you for data processing.
|
||||
|
||||
Before data processing, you can also try:
|
||||
- Use `view_text_file` to inspect a small subset of the raw data (e.g., the first 2~5 samples) in order to:
|
||||
1. Verify the exact field names and formats
|
||||
2. Determine appropriate parameter values such as text length ranges, language types, confidence thresholds, etc.
|
||||
3. Understand data characteristics to optimize operator parameter configuration
|
||||
"""
|
||||
|
||||
ROUTER_SYS_PROMPT = """
|
||||
You are an AI routing agent named {name}. Your primary responsibility is to analyze user queries and route them to the most appropriate specialized agent for handling.
|
||||
|
||||
Key responsibilities:
|
||||
1. Understand the user's intent and requirements
|
||||
2. Select the most suitable agent from available options
|
||||
3. Handle user input requests from routed agents properly
|
||||
|
||||
When routing to an agent that requires user input:
|
||||
- If the routed agent returns a response indicating that additional input or configuration is required for user confirmation or submission, you must:
|
||||
1. Stop the current routing process
|
||||
2. Present the agent's request to the user directly
|
||||
3. Wait for user's response before continuing
|
||||
4. Pass the user's input back to the appropriate agent
|
||||
|
||||
- NEVER fabricate or guess user input values (like paths, configurations, etc.)
|
||||
- Always ask the user for the required information when an agent needs it
|
||||
|
||||
Available agents and their capabilities will be provided as tools in your toolkit.
|
||||
"""
|
||||
5
data_juicer_agent/requirements.txt
Normal file
5
data_juicer_agent/requirements.txt
Normal file
@@ -0,0 +1,5 @@
|
||||
agentscope>=1.0.5
|
||||
py-data-juicer>=1.4.2
|
||||
faiss-cpu>=1.12.0
|
||||
fire>=0.7.1
|
||||
langchain-community
|
||||
88
data_juicer_agent/tools/__init__.py
Normal file
88
data_juicer_agent/tools/__init__.py
Normal file
@@ -0,0 +1,88 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Tools package for data-agent.
|
||||
|
||||
This module provides a unified entry point for all agent tools,
|
||||
organized by agent type for easy access and management.
|
||||
"""
|
||||
from typing import List
|
||||
from agentscope.agent import AgentBase
|
||||
from agentscope.tool import (
|
||||
view_text_file,
|
||||
write_text_file,
|
||||
)
|
||||
from agentscope.tool import Toolkit
|
||||
|
||||
from .dj_helpers import execute_safe_command
|
||||
from .router_helpers import agent_to_tool
|
||||
from .dj_helpers import query_dj_operators
|
||||
from .dj_dev_helpers import get_basic_files, get_operator_example, configure_data_juicer_path
|
||||
from .mcp_helpers import get_mcp_toolkit
|
||||
|
||||
def create_toolkit(tools: List[str]):
|
||||
# Create toolkit and register tools
|
||||
toolkit = Toolkit()
|
||||
for tool in tools:
|
||||
toolkit.register_tool_function(tool)
|
||||
|
||||
return toolkit
|
||||
|
||||
# DJ Agent tools
|
||||
dj_tools = [
|
||||
execute_safe_command,
|
||||
view_text_file,
|
||||
write_text_file,
|
||||
query_dj_operators,
|
||||
]
|
||||
|
||||
# DJ Development Agent tools - for developing DataJuicer operators
|
||||
dj_dev_tools = [
|
||||
view_text_file,
|
||||
write_text_file,
|
||||
get_basic_files,
|
||||
get_operator_example,
|
||||
configure_data_juicer_path,
|
||||
]
|
||||
|
||||
# MCP Agent tools - for advanced data processing with Recipe Flow MCP
|
||||
mcp_tools = [
|
||||
view_text_file,
|
||||
write_text_file,
|
||||
]
|
||||
|
||||
def agents2toolkit(agents: List[AgentBase]):
|
||||
tools = [agent_to_tool(agent) for agent in agents]
|
||||
return create_toolkit(tools)
|
||||
|
||||
dj_toolkit = create_toolkit(dj_tools)
|
||||
dj_dev_toolkit = create_toolkit(dj_dev_tools)
|
||||
|
||||
|
||||
# All available tools
|
||||
all_toolkit = {
|
||||
"dj": dj_toolkit,
|
||||
"dj_dev": dj_dev_toolkit,
|
||||
"dj_mcp": get_mcp_toolkit,
|
||||
"router": agents2toolkit,
|
||||
}
|
||||
|
||||
# Public API
|
||||
__all__ = [
|
||||
"dj_tools",
|
||||
"dj_dev_tools",
|
||||
"mcp_tools",
|
||||
"all_tools",
|
||||
"agents2toolkit",
|
||||
"dj_toolkit",
|
||||
"dj_dev_toolkit",
|
||||
"get_mcp_toolkit",
|
||||
# Individual tools for direct import
|
||||
"execute_safe_command",
|
||||
"view_text_file",
|
||||
"write_text_file",
|
||||
"agent_to_tool",
|
||||
"query_dj_operators",
|
||||
"get_basic_files",
|
||||
"get_operator_example",
|
||||
"configure_data_juicer_path",
|
||||
]
|
||||
234
data_juicer_agent/tools/dj_dev_helpers.py
Normal file
234
data_juicer_agent/tools/dj_dev_helpers.py
Normal file
@@ -0,0 +1,234 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
DataJuicer Development Tools
|
||||
|
||||
Tools for developing DataJuicer operators, including access to basic documentation
|
||||
and example code for different operator types.
|
||||
"""
|
||||
|
||||
import os
|
||||
from agentscope.message import TextBlock
|
||||
from agentscope.tool import ToolResponse
|
||||
|
||||
# DataJuicer home path - should be configured based on your environment
|
||||
DATA_JUICER_PATH = os.getenv("DATA_JUICER_PATH", None)
|
||||
|
||||
BASIC_LIST_RELATIVE = [
|
||||
"data_juicer/ops/base_op.py",
|
||||
"docs/DeveloperGuide.md",
|
||||
"docs/DeveloperGuide_ZH.md",
|
||||
]
|
||||
|
||||
|
||||
def get_basic_files() -> ToolResponse:
|
||||
"""Get basic DataJuicer development files content.
|
||||
|
||||
Returns the content of essential files needed for DJ operator development:
|
||||
- base_op.py: Base operator class
|
||||
- DeveloperGuide.md: English developer guide
|
||||
- DeveloperGuide_ZH.md: Chinese developer guide
|
||||
|
||||
Returns:
|
||||
ToolResponse: Combined content of all basic development files
|
||||
"""
|
||||
global DATA_JUICER_PATH, BASIC_LIST_RELATIVE
|
||||
if DATA_JUICER_PATH is None:
|
||||
return ToolResponse(
|
||||
content=[
|
||||
TextBlock(
|
||||
type="text",
|
||||
text="DATA_JUICER_PATH is not configured. Please ask the user to provide the DATA_JUICER_PATH",
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
try:
|
||||
combined_content = "# DataJuicer Operator Development Basic Files\n\n"
|
||||
|
||||
for relative_path in BASIC_LIST_RELATIVE:
|
||||
file_path = os.path.join(DATA_JUICER_PATH, relative_path)
|
||||
if os.path.exists(file_path):
|
||||
try:
|
||||
with open(file_path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
|
||||
filename = os.path.basename(file_path)
|
||||
combined_content += f"## {filename}\n\n"
|
||||
combined_content += (
|
||||
f"```{'python' if filename.endswith('.py') else 'markdown'}\n"
|
||||
)
|
||||
combined_content += content
|
||||
combined_content += "\n```\n\n"
|
||||
except Exception as e:
|
||||
combined_content += (
|
||||
f"## {os.path.basename(file_path)} (Read Failed)\n"
|
||||
)
|
||||
combined_content += f"Error: {str(e)}\n\n"
|
||||
|
||||
return ToolResponse(content=[TextBlock(type="text", text=combined_content)])
|
||||
|
||||
except Exception as e:
|
||||
return ToolResponse(
|
||||
content=[
|
||||
TextBlock(
|
||||
type="text",
|
||||
text=f"Error occurred while getting basic files: {str(e)}",
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
async def get_operator_example(
|
||||
requirement_description: str, limit: int = 2
|
||||
) -> ToolResponse:
|
||||
"""Get example operators based on requirement description using dynamic search.
|
||||
|
||||
Args:
|
||||
requirement_description (str): Natural language description of the operator requirement
|
||||
limit (int): Maximum number of example operators to return (default: 2)
|
||||
|
||||
Returns:
|
||||
ToolResponse: Example operator code and test files based on the requirement
|
||||
"""
|
||||
global DATA_JUICER_PATH
|
||||
if DATA_JUICER_PATH is None:
|
||||
return ToolResponse(
|
||||
content=[
|
||||
TextBlock(
|
||||
type="text",
|
||||
text="DATA_JUICER_PATH is not configured. Please ask the user to provide the DATA_JUICER_PATH",
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
try:
|
||||
# Import retrieve_ops from op_manager
|
||||
from .op_manager.op_retrieval import retrieve_ops
|
||||
|
||||
# Query relevant operators using the requirement description
|
||||
# Use retrieval mode from environment variable if set
|
||||
retrieval_mode = os.environ.get("RETRIEVAL_MODE", "auto")
|
||||
tool_names = await retrieve_ops(requirement_description, limit=limit, mode=retrieval_mode)
|
||||
|
||||
if not tool_names:
|
||||
return ToolResponse(
|
||||
content=[
|
||||
TextBlock(
|
||||
type="text",
|
||||
text=f"No relevant operators found for requirement: {requirement_description}\n"
|
||||
f"Please try with more specific keywords or check if DATA_JUICER_PATH is properly configured.",
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
combined_content = (
|
||||
f"# Dynamic Operator Examples for: {requirement_description}\n\n"
|
||||
)
|
||||
combined_content += (
|
||||
f"Found {len(tool_names)} relevant operators (limit: {limit})\n\n"
|
||||
)
|
||||
|
||||
# Process each found operator
|
||||
for i, tool_name in enumerate(tool_names[:limit]):
|
||||
combined_content += f"## {i+1}. {tool_name}\n\n"
|
||||
|
||||
op_type = tool_name.split("_")[-1]
|
||||
|
||||
operator_path = f"data_juicer/ops/{op_type}/{tool_name}.py"
|
||||
|
||||
# Try to find operator source file
|
||||
|
||||
full_path = os.path.join(DATA_JUICER_PATH, operator_path)
|
||||
if os.path.exists(full_path):
|
||||
with open(full_path, "r", encoding="utf-8") as f:
|
||||
operator_code = f.read()
|
||||
|
||||
combined_content += f"### Source Code\n"
|
||||
combined_content += "```python\n"
|
||||
combined_content += operator_code
|
||||
combined_content += "\n```\n\n"
|
||||
else:
|
||||
combined_content += (
|
||||
f"**Note:** Source code file not found for `{tool_name}`.\n\n"
|
||||
)
|
||||
|
||||
test_path = f"tests/ops/{op_type}/test_{tool_name}.py"
|
||||
|
||||
full_test_path = os.path.join(DATA_JUICER_PATH, test_path)
|
||||
if os.path.exists(full_test_path):
|
||||
with open(full_test_path, "r", encoding="utf-8") as f:
|
||||
test_code = f.read()
|
||||
|
||||
combined_content += f"### Test Code\n"
|
||||
combined_content += f"**File Path:** `{test_path}`\n\n"
|
||||
combined_content += "```python\n"
|
||||
combined_content += test_code
|
||||
combined_content += "\n```\n\n"
|
||||
|
||||
else:
|
||||
combined_content += (
|
||||
f"**Note:** Test file not found for `{tool_name}`.\n\n"
|
||||
)
|
||||
|
||||
combined_content += "---\n\n"
|
||||
|
||||
return ToolResponse(content=[TextBlock(type="text", text=combined_content)])
|
||||
|
||||
except Exception as e:
|
||||
return ToolResponse(
|
||||
content=[
|
||||
TextBlock(
|
||||
type="text",
|
||||
text=f"Error occurred while getting operator examples: {str(e)}\n"
|
||||
f"Please check the requirement description and try again.",
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def configure_data_juicer_path(data_juicer_path: str) -> ToolResponse:
|
||||
"""Configure DataJuicer path.
|
||||
If the user provides the data_juicer_path, please use this method to configure it.
|
||||
|
||||
Args:
|
||||
data_juicer_path (str): Path to DataJuicer installation
|
||||
|
||||
Returns:
|
||||
ToolResponse: Configuration result
|
||||
"""
|
||||
global DATA_JUICER_PATH
|
||||
|
||||
data_juicer_path = os.path.expanduser(data_juicer_path)
|
||||
|
||||
try:
|
||||
if not os.path.exists(data_juicer_path):
|
||||
return ToolResponse(
|
||||
content=[
|
||||
TextBlock(
|
||||
type="text",
|
||||
text=f"Specified DataJuicer path does not exist: {data_juicer_path}",
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
# Update global DATA_JUICER_PATH
|
||||
DATA_JUICER_PATH = data_juicer_path
|
||||
|
||||
return ToolResponse(
|
||||
content=[
|
||||
TextBlock(
|
||||
type="text",
|
||||
text=f"DataJuicer path has been updated to: {DATA_JUICER_PATH}",
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
return ToolResponse(
|
||||
content=[
|
||||
TextBlock(
|
||||
type="text",
|
||||
text=f"Error occurred while configuring DataJuicer path: {str(e)}",
|
||||
)
|
||||
]
|
||||
)
|
||||
224
data_juicer_agent/tools/dj_helpers.py
Normal file
224
data_juicer_agent/tools/dj_helpers.py
Normal file
@@ -0,0 +1,224 @@
|
||||
import os
|
||||
import os.path as osp
|
||||
import json
|
||||
import asyncio
|
||||
from typing import Any
|
||||
from agentscope.message import TextBlock
|
||||
from agentscope.tool import ToolResponse
|
||||
from .op_manager.op_retrieval import retrieve_ops
|
||||
|
||||
# Load tool information for formatting
|
||||
TOOLS_INFO_PATH = osp.join(osp.dirname(__file__), "op_manager", "dj_funcs_all.json")
|
||||
|
||||
def _load_tools_info():
|
||||
"""Load tools information from JSON file or create it if not exists"""
|
||||
if osp.exists(TOOLS_INFO_PATH):
|
||||
with open(TOOLS_INFO_PATH, "r", encoding="utf-8") as f:
|
||||
return json.loads(f.read())
|
||||
else:
|
||||
from .op_manager.create_dj_func_info import dj_func_info
|
||||
with open(TOOLS_INFO_PATH, "w", encoding="utf-8") as f:
|
||||
json.dump(dj_func_info, f)
|
||||
return dj_func_info
|
||||
|
||||
def _format_tool_names_to_class_entries(tool_names):
|
||||
"""Convert tool names list to formatted class entries string"""
|
||||
if not tool_names:
|
||||
return ""
|
||||
|
||||
tools_info = _load_tools_info()
|
||||
|
||||
# Create a mapping from class_name to tool info for quick lookup
|
||||
tools_map = {tool['class_name']: tool for tool in tools_info}
|
||||
|
||||
formatted_entries = []
|
||||
for i, tool_name in enumerate(tool_names):
|
||||
if tool_name in tools_map:
|
||||
tool_info = tools_map[tool_name]
|
||||
class_entry = f"{i+1}. {tool_info['class_name']}: {tool_info['class_desc']}"
|
||||
class_entry += "\n" + tool_info["arguments"]
|
||||
formatted_entries.append(class_entry)
|
||||
|
||||
return "\n".join(formatted_entries)
|
||||
|
||||
async def query_dj_operators(query: str, limit: int = 20) -> ToolResponse:
|
||||
"""Query DataJuicer operators by natural language description.
|
||||
|
||||
Retrieves relevant operators from DataJuicer library based on user query.
|
||||
Supports matching by functionality, data type, and processing scenarios.
|
||||
|
||||
Args:
|
||||
query (str): Natural language operator query
|
||||
limit (int): Maximum number of operators to return (default: 20)
|
||||
|
||||
Returns:
|
||||
ToolResponse: Tool response containing matched operators with names, descriptions, and parameters
|
||||
"""
|
||||
|
||||
try:
|
||||
# Retrieve operator names using existing functionality with limit
|
||||
# Use retrieval mode from environment variable if set
|
||||
retrieval_mode = os.environ.get("RETRIEVAL_MODE", "auto")
|
||||
tool_names = await retrieve_ops(query, limit=limit, mode=retrieval_mode)
|
||||
|
||||
if not tool_names:
|
||||
return ToolResponse(
|
||||
content=[
|
||||
TextBlock(
|
||||
type="text",
|
||||
text=f"No matching DataJuicer operators found for query: {query}\n"
|
||||
f"Suggestions:\n"
|
||||
f"1. Use more specific keywords like 'text filter', 'image processing'\n"
|
||||
f"2. Check spelling and try alternative terms\n"
|
||||
f"3. Try English keywords for better matching",
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
# Format tool names to class entries
|
||||
retrieved_operators = _format_tool_names_to_class_entries(tool_names)
|
||||
|
||||
# Format response
|
||||
result_text = f"🔍 DataJuicer Operator Query Results\n"
|
||||
result_text += f"Query: {query}\n"
|
||||
result_text += f"Limit: {limit} operators\n"
|
||||
result_text += f"{'='*50}\n\n"
|
||||
result_text += retrieved_operators
|
||||
|
||||
return ToolResponse(
|
||||
content=[
|
||||
TextBlock(
|
||||
type="text",
|
||||
text=result_text,
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
return ToolResponse(
|
||||
content=[
|
||||
TextBlock(
|
||||
type="text",
|
||||
text=f"Error querying DataJuicer operators: {str(e)}\n"
|
||||
f"Please verify query parameters and retry.",
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
async def execute_safe_command(
|
||||
command: str,
|
||||
timeout: int = 300,
|
||||
**kwargs: Any,
|
||||
) -> ToolResponse:
|
||||
"""Execute safe commands including DataJuicer commands and other safe system commands.
|
||||
Returns the return code, standard output and error within <returncode></returncode>,
|
||||
<stdout></stdout> and <stderr></stderr> tags.
|
||||
|
||||
Args:
|
||||
command (`str`):
|
||||
The command to execute. Allowed commands include:
|
||||
- DataJuicer commands: dj-process, dj-analyze
|
||||
- File system commands: mkdir, ls, pwd, cat, echo, cp, mv, rm
|
||||
- Text processing: grep, head, tail, wc, sort, uniq
|
||||
- Archive commands: tar, zip, unzip
|
||||
- Other safe commands: which, whoami, date, find
|
||||
timeout (`float`, defaults to `300`):
|
||||
The maximum time (in seconds) allowed for the command to run.
|
||||
|
||||
Returns:
|
||||
`ToolResponse`:
|
||||
The tool response containing the return code, standard output, and
|
||||
standard error of the executed command.
|
||||
"""
|
||||
|
||||
# Security check: only allow safe commands
|
||||
command_stripped = command.strip()
|
||||
|
||||
# Define allowed command prefixes for security
|
||||
allowed_commands = [
|
||||
# DataJuicer commands
|
||||
'dj-process', 'dj-analyze',
|
||||
# File system operations
|
||||
'mkdir', 'ls', 'pwd', 'cat', 'echo', 'cp', 'mv', 'rm',
|
||||
# Text processing
|
||||
'grep', 'head', 'tail', 'wc', 'sort', 'uniq',
|
||||
# Archive operations
|
||||
'tar', 'zip', 'unzip',
|
||||
# Information commands
|
||||
'which', 'whoami', 'date', 'find',
|
||||
# Python commands
|
||||
'python', 'python3', 'pip', 'uv'
|
||||
]
|
||||
|
||||
# Check if command starts with any allowed command
|
||||
command_allowed = False
|
||||
for allowed_cmd in allowed_commands:
|
||||
if command_stripped.startswith(allowed_cmd):
|
||||
# Additional security checks for potentially dangerous commands
|
||||
if allowed_cmd in ['rm', 'mv'] and ('/' in command_stripped or '..' in command_stripped):
|
||||
# Prevent dangerous path operations
|
||||
continue
|
||||
command_allowed = True
|
||||
break
|
||||
|
||||
if not command_allowed:
|
||||
error_msg = f"Error: Command not allowed for security reasons. Allowed commands: {', '.join(allowed_commands)}. Received command: {command}"
|
||||
return ToolResponse(
|
||||
content=[
|
||||
TextBlock(
|
||||
type="text",
|
||||
text=(
|
||||
f"<returncode>-1</returncode>"
|
||||
f"<stdout></stdout>"
|
||||
f"<stderr>{error_msg}</stderr>"
|
||||
),
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
proc = await asyncio.create_subprocess_shell(
|
||||
command,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
bufsize=0,
|
||||
)
|
||||
|
||||
try:
|
||||
await asyncio.wait_for(proc.wait(), timeout=timeout)
|
||||
stdout, stderr = await proc.communicate()
|
||||
stdout_str = stdout.decode("utf-8")
|
||||
stderr_str = stderr.decode("utf-8")
|
||||
returncode = proc.returncode
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
stderr_suffix = (
|
||||
f"TimeoutError: The command execution exceeded "
|
||||
f"the timeout of {timeout} seconds."
|
||||
)
|
||||
returncode = -1
|
||||
try:
|
||||
proc.terminate()
|
||||
stdout, stderr = await proc.communicate()
|
||||
stdout_str = stdout.decode("utf-8")
|
||||
stderr_str = stderr.decode("utf-8")
|
||||
if stderr_str:
|
||||
stderr_str += f"\n{stderr_suffix}"
|
||||
else:
|
||||
stderr_str = stderr_suffix
|
||||
except ProcessLookupError:
|
||||
stdout_str = ""
|
||||
stderr_str = stderr_suffix
|
||||
|
||||
return ToolResponse(
|
||||
content=[
|
||||
TextBlock(
|
||||
type="text",
|
||||
text=(
|
||||
f"<returncode>{returncode}</returncode>"
|
||||
f"<stdout>{stdout_str}</stdout>"
|
||||
f"<stderr>{stderr_str}</stderr>"
|
||||
),
|
||||
),
|
||||
],
|
||||
)
|
||||
120
data_juicer_agent/tools/mcp_helpers.py
Normal file
120
data_juicer_agent/tools/mcp_helpers.py
Normal file
@@ -0,0 +1,120 @@
|
||||
import json
|
||||
import os
|
||||
import logging
|
||||
from typing import Optional, List
|
||||
import string
|
||||
|
||||
from agentscope.tool import Toolkit
|
||||
from agentscope.mcp import HttpStatefulClient, HttpStatelessClient, StdIOStatefulClient
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
root_path = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
|
||||
|
||||
def _load_config(config_path: str) -> dict:
|
||||
"""Load MCP configuration from file"""
|
||||
try:
|
||||
if os.path.exists(config_path):
|
||||
with open(config_path, "r", encoding="utf-8") as f:
|
||||
config = json.load(f)
|
||||
logger.info(f"Loaded MCP configuration from {config_path}")
|
||||
return config
|
||||
else:
|
||||
logger.warning(
|
||||
f"Configuration file {config_path} not found, using default settings"
|
||||
)
|
||||
return _create_default_config()
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading configuration: {e}")
|
||||
return _create_default_config()
|
||||
|
||||
def _create_default_config() -> dict:
|
||||
"""Create default configuration"""
|
||||
return {
|
||||
"mcpServers": {
|
||||
"dj_recipe_flow": {
|
||||
"command": "python",
|
||||
"args": ["/home/test/data_juicer/tools/DJ_mcp_recipe_flow.py"],
|
||||
"env": {"SERVER_TRANSPORT": "stdio"},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def _expand_env_vars(value: str) -> str:
|
||||
"""Expand environment variables in configuration values"""
|
||||
if isinstance(value, str):
|
||||
template = string.Template(value)
|
||||
try:
|
||||
return template.substitute(os.environ)
|
||||
except KeyError as e:
|
||||
logger.warning(f"Environment variable not found: {e}")
|
||||
return value
|
||||
return value
|
||||
|
||||
async def _create_clients(config: dict, toolkit: Toolkit):
|
||||
"""Create MCP clients based on configuration"""
|
||||
server_configs = config.get("mcpServers", {})
|
||||
clients = []
|
||||
|
||||
for server_name, server_config in server_configs.items():
|
||||
try:
|
||||
# Handle StdIO client
|
||||
if "command" in server_config:
|
||||
command = server_config["command"]
|
||||
args = server_config.get("args", [])
|
||||
env = server_config.get("env", {})
|
||||
|
||||
# Expand environment variables
|
||||
expanded_args = [_expand_env_vars(arg) for arg in args]
|
||||
expanded_env = {k: _expand_env_vars(v) for k, v in env.items()}
|
||||
|
||||
client = StdIOStatefulClient(
|
||||
name=server_name,
|
||||
command=command,
|
||||
args=expanded_args,
|
||||
env=expanded_env,
|
||||
)
|
||||
|
||||
await client.connect()
|
||||
await toolkit.register_mcp_client(client)
|
||||
|
||||
# Handle HTTP clients
|
||||
elif "url" in server_config:
|
||||
url = _expand_env_vars(server_config["url"])
|
||||
transport = server_config.get("transport", "sse")
|
||||
stateful = server_config.get("stateful", True)
|
||||
|
||||
if stateful:
|
||||
client = HttpStatefulClient(
|
||||
name=server_name, transport=transport, url=url
|
||||
)
|
||||
await client.connect()
|
||||
await toolkit.register_mcp_client(client)
|
||||
else:
|
||||
client = HttpStatelessClient(
|
||||
name=server_name, transport=transport, url=url
|
||||
)
|
||||
await toolkit.register_mcp_client(client)
|
||||
|
||||
else:
|
||||
raise ValueError("Invalid server configuration")
|
||||
|
||||
clients.append(client)
|
||||
except Exception as e:
|
||||
if "Invalid server configuration" in str(e):
|
||||
raise e
|
||||
logger.error(f"Failed to create client {server_name}: {e}")
|
||||
|
||||
return clients
|
||||
|
||||
async def get_mcp_toolkit(config_path: Optional[str] = None) -> Toolkit:
|
||||
"""Get toolkit with all MCP tools registered"""
|
||||
config_path = config_path or root_path + "/configs/mcp_config.json"
|
||||
config = _load_config(config_path)
|
||||
toolkit = Toolkit()
|
||||
|
||||
clients = await _create_clients(config, toolkit)
|
||||
|
||||
return toolkit, clients
|
||||
0
data_juicer_agent/tools/op_manager/__init__.py
Normal file
0
data_juicer_agent/tools/op_manager/__init__.py
Normal file
34
data_juicer_agent/tools/op_manager/create_dj_func_info.py
Normal file
34
data_juicer_agent/tools/op_manager/create_dj_func_info.py
Normal file
@@ -0,0 +1,34 @@
|
||||
import inspect
|
||||
from data_juicer.tools.op_search import OPSearcher
|
||||
|
||||
searcher = OPSearcher(include_formatter=False)
|
||||
|
||||
all_ops = searcher.search()
|
||||
|
||||
dj_func_info = []
|
||||
for i, op in enumerate(all_ops):
|
||||
class_entry = {"index": i, "class_name": op["name"], "class_desc": op["desc"]}
|
||||
param_desc = op["param_desc"]
|
||||
param_desc_map = {}
|
||||
args = ""
|
||||
for item in param_desc.split(":param"):
|
||||
_item = item.split(":")
|
||||
if len(_item) < 2:
|
||||
continue
|
||||
param_desc_map[_item[0].strip()] = ":".join(_item[1:]).strip()
|
||||
|
||||
if op["sig"]:
|
||||
for param_name, param in op["sig"].parameters.items():
|
||||
if param_name in ["self", "args", "kwargs"]:
|
||||
continue
|
||||
if param.kind in (
|
||||
inspect.Parameter.VAR_POSITIONAL,
|
||||
inspect.Parameter.VAR_KEYWORD,
|
||||
):
|
||||
continue
|
||||
if param_name in param_desc_map:
|
||||
args += f" {param_name} ({param.annotation}): {param_desc_map[param_name]}\n"
|
||||
else:
|
||||
args += f" {param_name} ({param.annotation})\n"
|
||||
class_entry["arguments"] = args
|
||||
dj_func_info.append(class_entry)
|
||||
1
data_juicer_agent/tools/op_manager/dj_funcs_all.json
Normal file
1
data_juicer_agent/tools/op_manager/dj_funcs_all.json
Normal file
File diff suppressed because one or more lines are too long
380
data_juicer_agent/tools/op_manager/op_retrieval.py
Normal file
380
data_juicer_agent/tools/op_manager/op_retrieval.py
Normal file
@@ -0,0 +1,380 @@
|
||||
import os
|
||||
import os.path as osp
|
||||
import json
|
||||
import logging
|
||||
import pickle
|
||||
import hashlib
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
from langchain_community.vectorstores import FAISS
|
||||
|
||||
TOOLS_INFO_PATH = osp.join(osp.dirname(__file__), "dj_funcs_all.json")
|
||||
CACHE_RETRIEVED_TOOLS_PATH = osp.join(osp.dirname(__file__), "cache_retrieve")
|
||||
VECTOR_INDEX_CACHE_PATH = osp.join(osp.dirname(__file__), "vector_index_cache")
|
||||
|
||||
# Global variable to cache the vector store
|
||||
_cached_vector_store: Optional[FAISS] = None
|
||||
_cached_tools_info: Optional[list] = None
|
||||
_cached_file_hash: Optional[str] = None
|
||||
|
||||
RETRIEVAL_PROMPT = """You are a professional tool retrieval assistant responsible for filtering the top {limit} most relevant tools from a large tool library based on user requirements. Execute the following steps:
|
||||
|
||||
# Requirement Analysis
|
||||
Carefully read the user's [requirement description], extract core keywords, functional objectives, usage scenarios, and technical requirements (such as real-time performance, data types, industry domains, etc.).
|
||||
|
||||
# Tool Matching
|
||||
Perform multi-dimensional matching based on the following tool attributes:
|
||||
- Tool name and functional description
|
||||
- Supported input/output formats
|
||||
- Applicable industry or scenario tags
|
||||
- Technical implementation principles (API, local deployment, AI model types)
|
||||
- Relevance ranking
|
||||
|
||||
# Use weighted scoring mechanism (example weights):
|
||||
- Functional match (40%)
|
||||
- Scenario compatibility (30%)
|
||||
- Technical compatibility (20%)
|
||||
- User rating/usage rate (10%)
|
||||
|
||||
# Deduplication and Optimization
|
||||
Exclude the following low-quality results:
|
||||
- Tools with duplicate functionality (keep only the best one)
|
||||
- Tools that cannot meet basic requirements
|
||||
- Tools missing critical parameter descriptions
|
||||
|
||||
# Constraints
|
||||
- Strictly control output to a maximum of {limit} tools
|
||||
- Refuse to speculate on unknown tool attributes
|
||||
- Maintain accuracy of domain expertise
|
||||
|
||||
# Output Format
|
||||
Return a JSON format TOP{limit} tool list containing:
|
||||
[
|
||||
{{
|
||||
"rank": 1,
|
||||
"tool_name": "Tool Name",
|
||||
"description": "Core functionality summary",
|
||||
"relevance_score": 98.7,
|
||||
"key_match": ["Matching keywords/features"]
|
||||
}}
|
||||
]
|
||||
Output strictly in JSON array format, and only output the JSON array format tool list.
|
||||
"""
|
||||
|
||||
|
||||
def fast_text_encoder(text: str) -> str:
|
||||
"""Fast encoding using xxHash algorithm"""
|
||||
import xxhash
|
||||
|
||||
hasher = xxhash.xxh64(seed=0)
|
||||
hasher.update(text.encode("utf-8"))
|
||||
|
||||
# Return 16-bit hexadecimal string
|
||||
return hasher.hexdigest()
|
||||
|
||||
|
||||
async def retrieve_ops_lm(user_query, limit=20):
|
||||
"""Tool retrieval using language model - returns list of tool names"""
|
||||
hash_id = fast_text_encoder(user_query + str(limit))
|
||||
|
||||
# Ensure cache directory exists
|
||||
os.makedirs(CACHE_RETRIEVED_TOOLS_PATH, exist_ok=True)
|
||||
|
||||
cache_tools_path = osp.join(CACHE_RETRIEVED_TOOLS_PATH, f"{hash_id}.json")
|
||||
if osp.exists(cache_tools_path):
|
||||
with open(cache_tools_path, "r", encoding="utf-8") as f:
|
||||
return json.loads(f.read())
|
||||
|
||||
if osp.exists(TOOLS_INFO_PATH):
|
||||
with open(TOOLS_INFO_PATH, "r", encoding="utf-8") as f:
|
||||
dj_func_info = json.loads(f.read())
|
||||
tool_descriptions = [
|
||||
f"{t['class_name']}: {t['class_desc']}" for t in dj_func_info
|
||||
]
|
||||
tools_string = "\n".join(tool_descriptions)
|
||||
else:
|
||||
from create_dj_func_info import dj_func_info
|
||||
|
||||
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
||||
|
||||
with open(os.path.join(project_root, TOOLS_INFO_PATH), "w") as f:
|
||||
f.write(json.dumps(dj_func_info))
|
||||
|
||||
tool_descriptions = [
|
||||
f"{t['class_name']}: {t['class_desc']}" for t in dj_func_info
|
||||
]
|
||||
tools_string = "\n".join(tool_descriptions)
|
||||
|
||||
from agentscope.model import DashScopeChatModel
|
||||
from agentscope.message import Msg
|
||||
from agentscope.formatter import DashScopeChatFormatter
|
||||
|
||||
model = DashScopeChatModel(
|
||||
model_name="qwen-turbo",
|
||||
api_key=os.environ.get("DASHSCOPE_API_KEY"),
|
||||
stream=False,
|
||||
)
|
||||
|
||||
formatter = DashScopeChatFormatter()
|
||||
|
||||
# Update retrieval prompt to use the specified limit
|
||||
retrieval_prompt_with_limit = RETRIEVAL_PROMPT.format(limit=limit)
|
||||
|
||||
user_prompt = (
|
||||
retrieval_prompt_with_limit
|
||||
+ """
|
||||
User requirement description:
|
||||
{user_query}
|
||||
|
||||
Available tools:
|
||||
{tools_string}
|
||||
""".format(
|
||||
user_query=user_query, tools_string=tools_string
|
||||
)
|
||||
)
|
||||
|
||||
msgs = [
|
||||
Msg(name="user", role="user", content=user_prompt),
|
||||
]
|
||||
|
||||
formatted_msgs = await formatter.format(msgs)
|
||||
|
||||
response = await model(formatted_msgs)
|
||||
|
||||
msg = Msg(name="assistant", role="assistant", content=response.content)
|
||||
retrieved_tools_text = msg.get_text_content()
|
||||
retrieved_tools = json.loads(retrieved_tools_text)
|
||||
|
||||
# Extract tool names and validate they exist
|
||||
tool_names = []
|
||||
for tool_info in retrieved_tools:
|
||||
if not isinstance(tool_info, dict) or "tool_name" not in tool_info:
|
||||
logging.warning(f"Invalid tool info format: {tool_info}")
|
||||
continue
|
||||
|
||||
tool_name = tool_info["tool_name"]
|
||||
|
||||
# Verify tool exists in dj_func_info
|
||||
tool_exists = any(t["class_name"] == tool_name for t in dj_func_info)
|
||||
if not tool_exists:
|
||||
logging.error(f"Tool not found: `{tool_name}`, skipping!")
|
||||
continue
|
||||
|
||||
tool_names.append(tool_name)
|
||||
|
||||
# Cache the result
|
||||
with open(cache_tools_path, "w", encoding="utf-8") as f:
|
||||
json.dump(tool_names, f)
|
||||
|
||||
return tool_names
|
||||
|
||||
|
||||
def _get_file_hash(file_path: str) -> str:
|
||||
"""Get file content hash using SHA256"""
|
||||
try:
|
||||
with open(file_path, "rb") as f:
|
||||
file_content = f.read()
|
||||
return hashlib.sha256(file_content).hexdigest()
|
||||
except (OSError, IOError):
|
||||
return ""
|
||||
|
||||
|
||||
def _load_cached_index() -> bool:
|
||||
"""Load cached vector index from disk"""
|
||||
global _cached_vector_store, _cached_tools_info, _cached_file_hash
|
||||
|
||||
try:
|
||||
# Ensure cache directory exists
|
||||
os.makedirs(VECTOR_INDEX_CACHE_PATH, exist_ok=True)
|
||||
|
||||
index_path = osp.join(VECTOR_INDEX_CACHE_PATH, "faiss_index")
|
||||
metadata_path = osp.join(VECTOR_INDEX_CACHE_PATH, "metadata.json")
|
||||
|
||||
if not all(
|
||||
os.path.exists(p) for p in [index_path, metadata_path]
|
||||
):
|
||||
return False
|
||||
|
||||
# Check if cached index matches current tools info file
|
||||
with open(metadata_path, "r") as f:
|
||||
metadata = json.load(f)
|
||||
|
||||
cached_hash = metadata.get("tools_info_hash", "")
|
||||
current_hash = _get_file_hash(TOOLS_INFO_PATH)
|
||||
|
||||
if current_hash != cached_hash:
|
||||
return False
|
||||
|
||||
# Load cached data
|
||||
from langchain_community.embeddings import DashScopeEmbeddings
|
||||
|
||||
embeddings = DashScopeEmbeddings(
|
||||
dashscope_api_key=os.environ.get("DASHSCOPE_API_KEY"),
|
||||
model="text-embedding-v1",
|
||||
)
|
||||
|
||||
_cached_vector_store = FAISS.load_local(
|
||||
index_path, embeddings, allow_dangerous_deserialization=True
|
||||
)
|
||||
|
||||
_cached_file_hash = cached_hash
|
||||
|
||||
logging.info("Successfully loaded cached vector index")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logging.warning(f"Failed to load cached index: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def _save_cached_index():
|
||||
"""Save vector index to disk cache"""
|
||||
global _cached_vector_store, _cached_file_hash
|
||||
|
||||
try:
|
||||
# Ensure cache directory exists
|
||||
os.makedirs(VECTOR_INDEX_CACHE_PATH, exist_ok=True)
|
||||
|
||||
index_path = osp.join(VECTOR_INDEX_CACHE_PATH, "faiss_index")
|
||||
metadata_path = osp.join(VECTOR_INDEX_CACHE_PATH, "metadata.json")
|
||||
|
||||
# Save vector store
|
||||
if _cached_vector_store:
|
||||
_cached_vector_store.save_local(index_path)
|
||||
|
||||
# Save metadata
|
||||
metadata = {"tools_info_hash": _cached_file_hash, "created_at": time.time()}
|
||||
with open(metadata_path, "w") as f:
|
||||
json.dump(metadata, f)
|
||||
|
||||
logging.info("Successfully saved vector index to cache")
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to save cached index: {e}")
|
||||
|
||||
|
||||
def _build_vector_index():
|
||||
"""Build and cache vector index"""
|
||||
global _cached_vector_store, _cached_file_hash
|
||||
|
||||
with open(TOOLS_INFO_PATH, "r", encoding="utf-8") as f:
|
||||
tools_info = json.loads(f.read())
|
||||
|
||||
tool_descriptions = [f"{t['class_name']}: {t['class_desc']}" for t in tools_info]
|
||||
|
||||
from langchain_community.embeddings import DashScopeEmbeddings
|
||||
|
||||
embeddings = DashScopeEmbeddings(
|
||||
dashscope_api_key=os.environ.get("DASHSCOPE_API_KEY"), model="text-embedding-v1"
|
||||
)
|
||||
|
||||
metadatas = [{"index": i} for i in range(len(tool_descriptions))]
|
||||
vector_store = FAISS.from_texts(tool_descriptions, embeddings, metadatas=metadatas)
|
||||
|
||||
# Cache the results
|
||||
_cached_vector_store = vector_store
|
||||
_cached_file_hash = _get_file_hash(TOOLS_INFO_PATH)
|
||||
|
||||
# Save to disk cache
|
||||
_save_cached_index()
|
||||
|
||||
logging.info("Successfully built and cached vector index")
|
||||
|
||||
|
||||
def retrieve_ops_vector(user_query, limit=20):
|
||||
"""Tool retrieval using vector search with caching - returns list of tool names"""
|
||||
global _cached_vector_store
|
||||
|
||||
# Try to load from cache first
|
||||
if not _load_cached_index():
|
||||
logging.info("Building new vector index...")
|
||||
_build_vector_index()
|
||||
|
||||
# Perform similarity search
|
||||
retrieved_tools = _cached_vector_store.similarity_search(user_query, k=limit)
|
||||
retrieved_indices = [doc.metadata["index"] for doc in retrieved_tools]
|
||||
|
||||
with open(TOOLS_INFO_PATH, "r", encoding="utf-8") as f:
|
||||
tools_info = json.loads(f.read())
|
||||
|
||||
# Extract tool names from retrieved indices
|
||||
tool_names = []
|
||||
for raw_idx in retrieved_indices:
|
||||
tool_info = tools_info[raw_idx]
|
||||
tool_names.append(tool_info["class_name"])
|
||||
|
||||
return tool_names
|
||||
|
||||
|
||||
async def retrieve_ops(user_query: str, limit: int = 20, mode: str = "auto") -> list:
|
||||
"""
|
||||
Tool retrieval with configurable mode
|
||||
|
||||
Args:
|
||||
user_query: User query string
|
||||
limit: Maximum number of tools to retrieve
|
||||
mode: Retrieval mode - "llm", "vector", or "auto" (default: "auto")
|
||||
- "llm": Use language model only
|
||||
- "vector": Use vector search only
|
||||
- "auto": Try LLM first, fallback to vector search on failure
|
||||
|
||||
Returns:
|
||||
List of tool names
|
||||
"""
|
||||
if mode == "llm":
|
||||
try:
|
||||
return await retrieve_ops_lm(user_query, limit=limit)
|
||||
except Exception as e:
|
||||
logging.error(f"LLM retrieval failed: {str(e)}")
|
||||
return []
|
||||
|
||||
elif mode == "vector":
|
||||
try:
|
||||
return retrieve_ops_vector(user_query, limit=limit)
|
||||
except Exception as e:
|
||||
logging.error(f"Vector retrieval failed: {str(e)}")
|
||||
return []
|
||||
|
||||
elif mode == "auto":
|
||||
try:
|
||||
return await retrieve_ops_lm(user_query, limit=limit)
|
||||
except Exception as e:
|
||||
import traceback
|
||||
|
||||
print(traceback.format_exc())
|
||||
try:
|
||||
return retrieve_ops_vector(user_query, limit=limit)
|
||||
except Exception as fallback_e:
|
||||
logging.error(
|
||||
f"Tool retrieval failed: {str(e)}, fallback retrieval also failed: {str(fallback_e)}"
|
||||
)
|
||||
return []
|
||||
|
||||
else:
|
||||
raise ValueError(f"Invalid mode: {mode}. Must be 'llm', 'vector', or 'auto'")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import asyncio
|
||||
|
||||
user_query = (
|
||||
"Clean special characters from text and filter samples with excessive length. Mask sensitive information and filter unsafe content including adult/terror-related terms."
|
||||
+ "Additionally, filter out small images, perform image tagging, and remove duplicate images."
|
||||
)
|
||||
|
||||
# Test different modes
|
||||
print("=== Testing LLM mode ===")
|
||||
tool_names_llm = asyncio.run(retrieve_ops(user_query, limit=10, mode="llm"))
|
||||
print("Retrieved tool names (LLM):")
|
||||
print(tool_names_llm)
|
||||
|
||||
print("\n=== Testing Vector mode ===")
|
||||
tool_names_vector = asyncio.run(retrieve_ops(user_query, limit=10, mode="vector"))
|
||||
print("Retrieved tool names (Vector):")
|
||||
print(tool_names_vector)
|
||||
|
||||
print("\n=== Testing Auto mode (default) ===")
|
||||
tool_names_auto = asyncio.run(retrieve_ops(user_query, limit=10, mode="auto"))
|
||||
print("Retrieved tool names (Auto):")
|
||||
print(tool_names_auto)
|
||||
62
data_juicer_agent/tools/router_helpers.py
Normal file
62
data_juicer_agent/tools/router_helpers.py
Normal file
@@ -0,0 +1,62 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Router agent using implicit routing"""
|
||||
from typing import Callable
|
||||
from agentscope.agent import AgentBase
|
||||
from agentscope.message import Msg
|
||||
from agentscope.tool import ToolResponse
|
||||
|
||||
|
||||
def agent_to_tool(
|
||||
agent: AgentBase, tool_name: str = None, description: str = None
|
||||
) -> Callable:
|
||||
"""
|
||||
Convert any agent to a tool function that can be registered in toolkit.
|
||||
|
||||
Args:
|
||||
agent: The agent instance to convert
|
||||
tool_name: Optional custom tool name (defaults to agent.name)
|
||||
description: Optional tool description (defaults to agent's docstring or sys_prompt)
|
||||
|
||||
Returns:
|
||||
A tool function that can be registered with toolkit.register_tool_function()
|
||||
"""
|
||||
# Get tool name and description
|
||||
if tool_name is None:
|
||||
tool_name = getattr(agent, "name", "agent_tool")
|
||||
|
||||
if description is None:
|
||||
# Try to get description from agent's docstring or sys_prompt
|
||||
if hasattr(agent, "__doc__") and agent.__doc__:
|
||||
description = agent.__doc__.strip()
|
||||
elif hasattr(agent, "sys_prompt"):
|
||||
description = f"Agent: {agent.sys_prompt[:100]}..."
|
||||
elif hasattr(agent, "_sys_prompt"):
|
||||
description = f"Agent: {agent._sys_prompt[:100]}..."
|
||||
else:
|
||||
description = f"Tool function for {tool_name}"
|
||||
|
||||
async def tool_function(task: str) -> ToolResponse:
|
||||
# Create message and call the agent
|
||||
msg = Msg("user", task, "user")
|
||||
result = await agent(msg)
|
||||
|
||||
# Extract content from the result
|
||||
if hasattr(result, "get_content_blocks"):
|
||||
content = result.get_content_blocks("text")
|
||||
return ToolResponse(
|
||||
content=content,
|
||||
metadata={
|
||||
"agent_name": getattr(agent, "name", "unknown"),
|
||||
"task": task,
|
||||
},
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Not a valid Msg object: {result}")
|
||||
|
||||
# Set function name and docstring
|
||||
tool_function.__name__ = f"call_{tool_name.lower().replace(' ', '_')}"
|
||||
tool_function.__doc__ = (
|
||||
f"{description}\n\nArgs:\n task (str): The task for {tool_name} to handle"
|
||||
)
|
||||
|
||||
return tool_function
|
||||
231
tests/data_juicer_agent_test.py
Normal file
231
tests/data_juicer_agent_test.py
Normal file
@@ -0,0 +1,231 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import os
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
root_path = Path(__file__).parent.parent
|
||||
sys.path.insert(0, str(root_path))
|
||||
sys.path.insert(0, str(Path(root_path) / "data_juicer_agent"))
|
||||
|
||||
import pytest
|
||||
from unittest.mock import AsyncMock, Mock, patch
|
||||
from agentscope.agent import ReActAgent
|
||||
from agentscope.model import DashScopeChatModel
|
||||
from agentscope.tool import Toolkit
|
||||
from agentscope.message import Msg
|
||||
from agentscope.formatter import DashScopeChatFormatter
|
||||
from agentscope.memory import InMemoryMemory
|
||||
from agentscope.tool import (
|
||||
view_text_file,
|
||||
write_text_file,
|
||||
)
|
||||
|
||||
# Import the main function and related components
|
||||
from data_juicer_agent.main import main
|
||||
from data_juicer_agent.agent_factory import create_agent
|
||||
from data_juicer_agent.tools import (
|
||||
dj_toolkit,
|
||||
dj_dev_toolkit,
|
||||
dj_tools,
|
||||
dj_dev_tools,
|
||||
mcp_tools,
|
||||
get_mcp_toolkit,
|
||||
execute_safe_command,
|
||||
query_dj_operators,
|
||||
get_basic_files,
|
||||
get_operator_example,
|
||||
configure_data_juicer_path,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_toolkit():
|
||||
"""Create a mocked Toolkit instance"""
|
||||
return Mock(spec=Toolkit)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_model():
|
||||
"""Create a mocked DashScopeChatModel"""
|
||||
model = Mock(spec=DashScopeChatModel)
|
||||
model.call = AsyncMock(
|
||||
return_value=Msg("assistant", "test response", role="assistant"),
|
||||
)
|
||||
return model
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_formatter():
|
||||
"""Create a mocked DashScopeChatFormatter"""
|
||||
return Mock(spec=DashScopeChatFormatter)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_memory():
|
||||
"""Create a mocked InMemoryMemory"""
|
||||
return Mock(spec=InMemoryMemory)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_mcp_client():
|
||||
"""Create a mocked MCP client"""
|
||||
mock_client = Mock()
|
||||
mock_client.name = "DJ_recipe_flow"
|
||||
mock_client.connect = AsyncMock()
|
||||
mock_client.close = AsyncMock()
|
||||
mock_client.get_callable_function = AsyncMock()
|
||||
mock_client.list_tools = AsyncMock()
|
||||
return mock_client
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_agent(
|
||||
mock_model,
|
||||
mock_formatter,
|
||||
mock_toolkit,
|
||||
mock_memory,
|
||||
):
|
||||
"""Create a mocked ReActAgent instance"""
|
||||
agent = Mock(spec=ReActAgent)
|
||||
agent.model = mock_model
|
||||
agent.formatter = mock_formatter
|
||||
agent.toolkit = mock_toolkit
|
||||
agent.memory = mock_memory
|
||||
agent.__call__ = AsyncMock(
|
||||
return_value=Msg("assistant", "test response", role="assistant"),
|
||||
)
|
||||
return agent
|
||||
|
||||
|
||||
class TestDataJuicerAgent:
|
||||
"""Test suite for the data_juicer_agent functionality"""
|
||||
|
||||
def create_named_mock_agent(self, name, mock_agent, *args, **kwargs):
|
||||
"""Create a named mock agent for testing"""
|
||||
agent_instance = Mock(spec=ReActAgent)
|
||||
agent_instance.model = mock_agent.model
|
||||
agent_instance.formatter = mock_agent.formatter
|
||||
agent_instance.toolkit = mock_agent.toolkit
|
||||
agent_instance.memory = mock_agent.memory
|
||||
agent_instance.__call__ = mock_agent.__call__
|
||||
agent_instance.name = name
|
||||
return agent_instance
|
||||
|
||||
async def mock_user_func(self, msg=None):
|
||||
return Msg("user", "exit", role="user")
|
||||
|
||||
def test_dj_toolkit_initialization(self):
|
||||
"""Test DJ toolkit initialization and tool registration"""
|
||||
assert dj_toolkit.tools.get("execute_safe_command") is not None
|
||||
assert dj_toolkit.tools.get("view_text_file") is not None
|
||||
assert dj_toolkit.tools.get("write_text_file") is not None
|
||||
assert dj_toolkit.tools.get("query_dj_operators") is not None
|
||||
|
||||
# Verify tool list contains expected tools
|
||||
expected_tools = [
|
||||
execute_safe_command,
|
||||
view_text_file,
|
||||
write_text_file,
|
||||
query_dj_operators,
|
||||
]
|
||||
assert len(dj_tools) == len(expected_tools)
|
||||
for tool in expected_tools:
|
||||
assert tool in dj_tools
|
||||
|
||||
def test_dj_dev_toolkit_initialization(self):
|
||||
"""Test DJ development toolkit initialization and tool registration"""
|
||||
assert dj_dev_toolkit.tools.get("view_text_file") is not None
|
||||
assert dj_dev_toolkit.tools.get("write_text_file") is not None
|
||||
assert dj_dev_toolkit.tools.get("get_basic_files") is not None
|
||||
assert dj_dev_toolkit.tools.get("get_operator_example") is not None
|
||||
assert dj_dev_toolkit.tools.get("configure_data_juicer_path") is not None
|
||||
|
||||
# Verify tool list contains expected tools
|
||||
expected_tools = [
|
||||
view_text_file,
|
||||
write_text_file,
|
||||
get_basic_files,
|
||||
get_operator_example,
|
||||
configure_data_juicer_path,
|
||||
]
|
||||
assert len(dj_dev_tools) == len(expected_tools)
|
||||
for tool in expected_tools:
|
||||
assert tool in dj_dev_tools
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_mcp_tools_list(self, mock_mcp_client):
|
||||
"""Test MCP tools list contains expected tools and MCP client binding"""
|
||||
with patch(
|
||||
"agentscope.mcp.HttpStatefulClient",
|
||||
return_value=mock_mcp_client,
|
||||
) as mock_client_cls:
|
||||
await get_mcp_toolkit()
|
||||
assert mock_client_cls.assert_called_once
|
||||
|
||||
expected_tools = [view_text_file, write_text_file]
|
||||
assert len(mcp_tools) == len(expected_tools)
|
||||
for tool in expected_tools:
|
||||
assert tool in mcp_tools
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_agent_initialization(
|
||||
self,
|
||||
mock_model,
|
||||
mock_formatter,
|
||||
mock_toolkit,
|
||||
mock_memory,
|
||||
):
|
||||
"""Test ReActAgent initialization"""
|
||||
with patch.dict(os.environ, {"DASHSCOPE_API_KEY": "test_key"}):
|
||||
agent = create_agent(
|
||||
name="DataJuicer",
|
||||
sys_prompt="You are {name}, a agent.",
|
||||
toolkit=mock_toolkit,
|
||||
description="test description",
|
||||
model=mock_model,
|
||||
formatter=mock_formatter,
|
||||
memory=mock_memory,
|
||||
)
|
||||
|
||||
assert agent.name == "DataJuicer"
|
||||
assert "DataJuicer" in agent.sys_prompt
|
||||
assert "test" in agent.__doc__
|
||||
assert agent.model == mock_model
|
||||
assert agent.formatter == mock_formatter
|
||||
assert agent.toolkit == mock_toolkit
|
||||
assert agent.memory == mock_memory
|
||||
assert isinstance(agent, ReActAgent)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_main_with_multiple_agents_loading(self, mock_agent, mock_mcp_client):
|
||||
"""Test main function loads multiple agents successfully"""
|
||||
with patch.dict(os.environ, {"DASHSCOPE_API_KEY": "test_key"}):
|
||||
mock_mcp_clients = [mock_mcp_client]
|
||||
|
||||
with patch(
|
||||
"data_juicer_agent.tools.mcp_helpers._create_clients",
|
||||
return_value=mock_mcp_clients,
|
||||
):
|
||||
with patch(
|
||||
"data_juicer_agent.main.create_agent",
|
||||
side_effect=lambda name, *args, **kwargs: self.create_named_mock_agent(
|
||||
name, mock_agent, *args, **kwargs
|
||||
),
|
||||
) as mock_create_agent:
|
||||
with patch(
|
||||
"data_juicer_agent.main.user", side_effect=self.mock_user_func
|
||||
):
|
||||
|
||||
await main(
|
||||
use_studio=False,
|
||||
available_agents=["dj", "dj_dev", "dj_mcp"],
|
||||
retrieval_mode="auto",
|
||||
)
|
||||
|
||||
# Validate multiple agents are correctly created (dj, dj_dev, dj_mcp, and router)
|
||||
assert mock_create_agent.call_count == 4
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main(["-v", __file__])
|
||||
Reference in New Issue
Block a user