diff --git a/README.md b/README.md
index d9b8f98..f16f8a5 100644
--- a/README.md
+++ b/README.md
@@ -90,6 +90,8 @@ It includes **agent deployment** and **secure sandboxed tool execution**, and ca
│ ├── multiagent_concurrent/ # Concurrent multi-agent task execution
│ └── meta_planner_agent/ # Planning agent with tool orchestration
│
+├── data_juicer_agent/ # Data processing multi-agent system
+│
├── sample_template/ # Template for new sample contributions
└── README.md
```
@@ -119,6 +121,23 @@ It includes **agent deployment** and **secure sandboxed tool execution**, and ca
| | functionality/plan | ✅ | ❌ | Task planning with ReAct agent |
| | functionality/rag | ✅ | ❌ | Retrieval-Augmented Generation (RAG) integration |
| | functionality/stream_printing_messages | ✅ | ❌ | Real-time message streaming and printing |
+| **Data Processing** | data_juicer_agent/ | ✅ | ❌ | Multi-agent data processing with Data-Juicer |
+
+------
+
+## 🌟 Featured Examples
+
+### DataJuicer Agent
+
+A powerful multi-agent data processing system that leverages Data-Juicer's 200+ operators for intelligent data processing:
+
+- **Intelligent Query**: Find suitable operators from 200+ data processing operators
+- **Automated Pipeline**: Generate Data-Juicer YAML configurations from natural language
+- **Custom Development**: Create domain-specific operators with AI assistance
+- **Multiple Retrieval Modes**: LLM-based and vector-based operator matching
+- **MCP Integration**: Native Model Context Protocol support
+
+📖 **Documentation**: [English](data_juicer_agent/README.md) | [中文](data_juicer_agent/README_ZH.md)
------
diff --git a/README_zh.md b/README_zh.md
index 84f9735..d3ed51a 100644
--- a/README_zh.md
+++ b/README_zh.md
@@ -90,6 +90,8 @@ AgentScope Runtime 是一个**全面的运行时框架**,主要解决部署和
│ ├── multiagent_concurrent/ # 多 Agent 并发任务执行
│ └── meta_planner_agent/ # 带工具编排的计划 Agent
│
+├── data_juicer_agent/ # 数据处理多智能体系统
+│
├── sample_template/ # 新样例贡献模板
└── README.md
```
@@ -119,6 +121,23 @@ AgentScope Runtime 是一个**全面的运行时框架**,主要解决部署和
| | functionality/plan | ✅ | ❌ | 使用 ReAct Agent 规划任务 |
| | functionality/rag | ✅ | ❌ | 检索增强生成 (RAG) 集成 |
| | functionality/stream_printing_messages | ✅ | ❌ | 实时信息流输出与打印 |
+| **数据处理** | data_juicer_agent/ | ✅ | ❌ | 基于 Data-Juicer 的多智能体数据处理 |
+
+---
+
+## 🌟 特色示例
+
+### DataJuicer 智能体
+
+一个强大的数据处理多智能体系统,利用 Data-Juicer 的 200+ 算子进行智能数据处理:
+
+- **智能查询**:从 200+ 数据处理算子中找到合适的算子
+- **自动化流程**:从自然语言描述生成 Data-Juicer YAML 配置
+- **自定义开发**:通过 AI 辅助创建领域特定的算子
+- **多种检索模式**:基于 LLM 和向量的算子匹配
+- **MCP 集成**:原生模型上下文协议支持
+
+📖 **文档**:[English](data_juicer_agent/README.md) | [中文](data_juicer_agent/README_ZH.md)
---
diff --git a/data_juicer_agent/.gitignore b/data_juicer_agent/.gitignore
new file mode 100644
index 0000000..fd0b546
--- /dev/null
+++ b/data_juicer_agent/.gitignore
@@ -0,0 +1,126 @@
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+*$py.class
+
+# C extensions
+*.so
+
+# Distribution / packaging
+.Python
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+wheels/
+pip-wheel-metadata/
+share/python-wheels/
+*.egg-info/
+.installed.cfg
+*.egg
+MANIFEST
+
+# PyInstaller
+# Usually these files are written by a python script from a template
+# before PyInstaller builds the exe, so as to inject date/other infos into it.
+*.manifest
+*.spec
+
+# Translations
+*.mo
+*.pot
+
+# Django stuff:
+*.log
+local_settings.py
+db.sqlite3
+db.sqlite3-journal
+
+# Flask stuff:
+instance/
+.webassets-cache
+
+# Scrapy stuff:
+.scrapy
+
+# Sphinx documentation
+docs/_build/
+
+# PyBuilder
+target/
+
+# Jupyter Notebook
+.ipynb_checkpoints
+
+# IPython
+profile_default/
+ipython_config.py
+
+# pyenv
+.python-version
+
+# pipenv
+# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
+# However, in case of collaboration, if having platform-specific dependencies or dependencies
+# having no cross-platform support, pipenv may install dependencies that don't work, or not
+# install all needed dependencies.
+#Pipfile.lock
+
+# PEP 582; used by e.g. github.com/David-OConnor/pyflow
+__pypackages__/
+
+# Celery stuff
+celerybeat-schedule
+celerybeat.pid
+
+# SageMath parsed files
+*.sage.py
+
+# Environments
+.env
+.venv
+env/
+venv/
+ENV/
+env.bak/
+venv.bak/
+
+# Spyder project settings
+.spyderproject
+.spyproject
+
+# Rope project settings
+.ropeproject
+
+# mkdocs documentation
+/site
+
+# mypy
+.mypy_cache/
+.dmypy.json
+dmypy.json
+
+# Pyre type checker
+.pyre/
+
+.idea/
+
+# macOS
+.DS_Store
+
+# Used to save loggings and files
+*runs/
+agentscope.db
+tmp*.json
+.vscode/
+data_agent/
+outputs/
+tools/op_manager/cache_retrieve/
+tools/op_manager/vector_index_cache/
\ No newline at end of file
diff --git a/data_juicer_agent/README.md b/data_juicer_agent/README.md
new file mode 100644
index 0000000..bd357e3
--- /dev/null
+++ b/data_juicer_agent/README.md
@@ -0,0 +1,254 @@
+# DataJuicer Agent
+
+A multi-agent data processing system built on [AgentScope](https://github.com/modelscope/agentscope) and [Data-Juicer (DJ)](https://github.com/modelscope/data-juicer). This project demonstrates how to leverage the natural language understanding capabilities of large language models, enabling non-expert users to easily harness the powerful data processing capabilities of Data-Juicer.
+
+## 📋 Table of Contents
+
+- [📋 Table of Contents](#-table-of-contents)
+- [What Does This Agent Do?](#what-does-this-agent-do)
+- [Architecture](#architecture)
+- [Quick Start](#quick-start)
+ - [System Requirements](#system-requirements)
+ - [Installation](#installation)
+ - [Configuration](#configuration)
+ - [Usage](#usage)
+- [Agent Introduction](#agent-introduction)
+ - [Data Processing Agent](#data-processing-agent)
+ - [Code Development Agent (DJ Dev Agent)](#code-development-agent-dj-dev-agent)
+- [Advanced Features](#advanced-features)
+ - [Operator Retrieval](#operator-retrieval)
+ - [Retrieval Modes](#retrieval-modes)
+ - [Usage](#usage-1)
+ - [MCP Agent](#mcp-agent)
+ - [MCP Server Types](#mcp-server-types)
+ - [Configuration](#configuration-1)
+ - [Usage Methods](#usage-methods)
+- [Feature Preview](#feature-preview)
+ - [Data-Juicer Q\&A Agent (Demo Available)](#data-juicer-qa-agent-demo-available)
+ - [Data Analysis and Visualization Agent (In Development)](#data-analysis-and-visualization-agent-in-development)
+- [Troubleshooting](#troubleshooting)
+ - [Common Issues](#common-issues)
+ - [Optimization Recommendations](#optimization-recommendations)
+
+## What Does This Agent Do?
+
+Data-Juicer (DJ) is a one-stop system for text and multimodal data processing for large language models. It provides nearly 200 core data processing operators, covering multimodal data such as text, images, and videos, and supports the full pipeline of data analysis, cleaning, and synthesis.
+
+After running this example, you can:
+- **Intelligent Query**: Find suitable operators from nearly 200 data processing operators for your data scenarios
+- **Automated Pipeline**: Describe your data processing needs, automatically generate Data-Juicer YAML configurations and execute them
+- **Custom Extension**: Quickly develop custom operators for specific scenarios
+
+## Architecture
+
+```
+User Query
+ ↓
+Router Agent ──┐
+ ├── Data Processing Agent (DJ Agent)
+ | ├── General File Read/Write Tools
+ │ ├── query_dj_operators (Query DataJuicer operators)
+ │ └── execute_safe_command (Execute safe commands including dj-process, dj-analyze)
+ │
+ └── Code Development Agent (DJ Dev Agent)
+ ├── General File Read/Write Tools
+ ├── get_basic_files (Get basic development knowledge)
+ ├── get_operator_example (Get operator source code examples related to requirements)
+ └── configure_data_juicer_path (Configure DataJuicer path)
+```
+
+## Quick Start
+
+### System Requirements
+
+- Python 3.8+
+- Valid DashScope API key
+- Optional: Data-Juicer source code (for custom operator development)
+
+### Installation
+
+```bash
+uv pip install -e .
+```
+
+### Configuration
+
+1. **Set API Key**
+
+```bash
+export DASHSCOPE_API_KEY="your-dashscope-key"
+```
+
+2. **Optional: Configure Data-Juicer Path (for custom operator development)**
+
+```bash
+export DATA_JUICER_PATH="your-data-juicer-path"
+```
+
+> **Tip**: You can also set this during runtime through conversation, for example:
+> - "Help me set the DataJuicer path: /path/to/data-juicer"
+> - "Help me update the DataJuicer path: /path/to/data-juicer"
+
+### Usage
+
+Choose the running mode using the `-u` or `--use_studio` parameter:
+
+```bash
+# Use AgentScope Studio (provides interactive interface)
+python main.py --use_studio true
+
+# Or use command-line mode (default)
+python main.py
+```
+
+## Agent Introduction
+
+### Data Processing Agent
+
+Responsible for interacting with Data-Juicer and executing actual data processing tasks. Supports automatic operator recommendation from natural language descriptions, configuration generation, and execution.
+
+**Typical Use Cases:**
+- **Data Cleaning**: Deduplication, removal of low-quality samples, format standardization
+- **Multimodal Processing**: Process text, image, and video data simultaneously
+- **Batch Conversion**: Format conversion, data augmentation, feature extraction
+
+
+View Complete Example Log (from AgentScope Studio)
+
+
+
+### Code Development Agent (DJ Dev Agent)
+
+Assists in developing custom data processing operators, powered by the `qwen3-coder-480b-a35b-instruct` model by default.
+
+**Typical Use Cases:**
+- **Develop domain-specific filter or transformation operators**
+- **Integrate proprietary data processing logic**
+- **Extend Data-Juicer capabilities for specific scenarios**
+
+
+View Complete Example Log (from AgentScope Studio)
+
+
+
+## Advanced Features
+
+### Operator Retrieval
+
+DJ Agent implements an intelligent operator retrieval tool that quickly finds the most relevant operators from Data-Juicer's nearly 200 operators through an independent LLM query process. This is a key component enabling the data processing agent and code development agent to run accurately.
+
+We provide three retrieval modes to choose from based on different scenarios:
+
+#### Retrieval Modes
+
+**LLM Retrieval (default)**
+- Uses the Qwen-Turbo model to match the most relevant operators
+- Provides detailed matching reasons and relevance scores
+- Suitable for scenarios requiring high-precision matching, but consumes more tokens
+
+**Vector Retrieval (vector)**
+- Based on DashScope text embedding and FAISS similarity search
+- Fast and efficient, suitable for large-scale retrieval scenarios
+
+**Auto Mode (auto)**
+- Prioritizes LLM retrieval, automatically falls back to vector retrieval on failure
+
+#### Usage
+
+Specify the retrieval mode using the `-r` or `--retrieve_mode` parameter:
+
+```bash
+python main.py --retrieve_mode vector
+```
+
+For more parameter descriptions, see `python main.py --help`
+
+### MCP Agent
+
+Data-Juicer provides MCP (Model Context Protocol) services that can directly obtain operator information and execute data processing through native interfaces, making it easy to migrate and integrate without separate LLM queries and command-line calls.
+
+#### MCP Server Types
+
+Data-Juicer provides two MCP server modes:
+
+**Recipe-Flow (Data Recipe)**
+- Filter by operator type and tags
+- Support combining multiple operators into data recipes for execution
+
+**Granular-Operators (Fine-grained Operators)**
+- Provide each operator as an independent tool
+- Flexibly specify operator lists through environment variables
+- Build fully customized data processing pipelines
+
+For detailed information, please refer to: [Data-Juicer MCP Service Documentation](https://modelscope.github.io/data-juicer/en/main/docs/DJ_service.html#mcp-server)
+
+> **Note**: The Data-Juicer MCP server is currently in early development, and features and tools may change with ongoing development.
+
+#### Configuration
+
+Configure the service address in `configs/mcp_config.json`:
+
+```json
+{
+ "mcpServers": {
+ "DJ_recipe_flow": {
+ "url": "http://127.0.0.1:8080/sse"
+ }
+ }
+}
+```
+
+#### Usage Methods
+
+Enable MCP Agent to replace DJ Agent:
+
+```bash
+# Enable MCP Agent and Dev Agent
+python main.py --available_agents [dj_mcp, dj_dev]
+
+# Or use shorthand
+python main.py -a [dj_mcp, dj_dev]
+```
+
+## Feature Preview
+
+The Data-Juicer agent ecosystem is rapidly expanding. Here are the new agents currently in development or planned:
+
+### Data-Juicer Q&A Agent (Demo Available)
+
+Provides users with detailed answers about Data-Juicer operators, concepts, and best practices.
+
+
+
+### Data Analysis and Visualization Agent (In Development)
+
+Generates data analysis and visualization results, expected to be released soon.
+
+## Troubleshooting
+
+### Common Issues
+
+**Q: How to get DashScope API key?**
+A: Visit [DashScope official website](https://dashscope.aliyun.com/) to register an account and apply for an API key.
+
+**Q: Why does operator retrieval fail?**
+A: Please check network connection and API key configuration, or try switching to vector retrieval mode.
+
+**Q: How to debug custom operators?**
+A: Ensure Data-Juicer path is configured correctly and check the example code provided by the code development agent.
+
+**Q: What to do if MCP service connection fails?**
+A: Check if the MCP server is running and confirm the URL address in the configuration file is correct.
+
+### Optimization Recommendations
+
+- For large-scale data processing, it is recommended to use DataJuicer's distributed mode
+- Set batch size appropriately to balance memory usage and processing speed
+- For more advanced data processing features (synthesis, Data-Model Co-Development), please refer to DataJuicer [documentation](https://modelscope.github.io/data-juicer/en/main/index.html)
+
+---
+
+**Contributing**: Welcome to submit Issues and Pull Requests to improve AgentScope, DataJuicer Agent, and [DataJuicer](https://modelscope.github.io/data-juicer/en/main/index.html#contribution-and-acknowledgements). If you encounter problems during use or have feature suggestions, please feel free to contact us.
\ No newline at end of file
diff --git a/data_juicer_agent/README_ZH.md b/data_juicer_agent/README_ZH.md
new file mode 100644
index 0000000..4aa2a62
--- /dev/null
+++ b/data_juicer_agent/README_ZH.md
@@ -0,0 +1,253 @@
+# DataJuicer 智能体
+
+基于 [AgentScope](https://github.com/modelscope/agentscope) 和 [Data-Juicer (DJ)](https://github.com/modelscope/data-juicer) 构建的数据处理多智能体系统。该项目展示了如何利用大模型的自然语言理解能力,让非专家用户也能轻松使用 Data-Juicer 的强大数据处理能力。
+
+## 📋 目录
+
+- [📋 目录](#-目录)
+- [这个智能体做了什么?](#这个智能体做了什么)
+- [架构](#架构)
+- [快速开始](#快速开始)
+ - [系统要求](#系统要求)
+ - [安装](#安装)
+ - [配置](#配置)
+ - [使用](#使用)
+- [智能体介绍](#智能体介绍)
+ - [数据处理智能体](#数据处理智能体)
+ - [代码开发智能体](#代码开发智能体)
+- [高级功能](#高级功能)
+ - [算子检索](#算子检索)
+ - [检索模式](#检索模式)
+ - [使用](#使用-1)
+ - [MCP 智能体](#mcp-智能体)
+ - [MCP 服务器类型](#mcp-服务器类型)
+ - [配置](#配置-1)
+ - [使用方法](#使用方法)
+- [功能预览](#功能预览)
+ - [Data-Juicer 问答智能体 (演示可用)](#data-juicer-问答智能体-演示可用)
+ - [数据分析与可视化智能体 (开发中)](#数据分析与可视化智能体-开发中)
+ - [常见问题](#常见问题)
+ - [优化建议](#优化建议)
+
+## 这个智能体做了什么?
+
+Data-Juicer (DJ) 是一个一站式系统,面向大模型的文本及多模态数据处理。它提供了近200个核心数据处理算子,覆盖文本、图像、视频等多模态数据,支持数据分析、清洗、合成等全流程。
+
+运行本示例后,您可以:
+- **智能查询**:从近200个数据处理算子中找到适合您数据场景的算子
+- **自动化流程**:描述数据处理需求,自动生成 Data-Juicer YAML 配置并执行
+- **自定义扩展**:为特定场景快速开发自定义算子
+
+## 架构
+
+```
+用户查询
+ ↓
+路由智能体 ──┐
+ ├── 数据处理智能体 (DJ 智能体)
+ | ├── 通用文件读写工具
+ │ ├── query_dj_operators (查询DataJuicer算子)
+ │ └── execute_safe_command (执行包含dj-process, dj-analyze在内的安全命令)
+ │
+ └── 代码开发智能体 (DJ Dev 智能体)
+ ├── 通用文件读写工具
+ ├── get_basic_files (获取基础的开发知识)
+ ├── get_operator_example (获取与需求相关的算子源码示例)
+ └── configure_data_juicer_path (配置DataJuicer路径)
+```
+
+## 快速开始
+
+### 系统要求
+
+- Python 3.8+
+- 有效的 DashScope API 密钥
+- 可选:Data-Juicer 源码(用于自定义算子开发)
+
+### 安装
+
+```bash
+uv pip install -e .
+```
+
+### 配置
+
+1. **设置 API 密钥**
+
+```bash
+export DASHSCOPE_API_KEY="your-dashscope-key"
+```
+
+2. **可选:配置 Data-Juicer 路径(用于自定义算子开发)**
+
+```bash
+export DATA_JUICER_PATH="your-data-juicer-path"
+```
+
+> **提示**:也可以在运行时通过对话设置,例如:
+> - "帮我设置 DataJuicer 路径:/path/to/data-juicer"
+> - "帮我更新 DataJuicer 路径:/path/to/data-juicer"
+
+### 使用
+
+通过 `-u` 或 `--use_studio` 参数选择运行方式:
+
+```bash
+# 使用 AgentScope Studio(提供交互式界面)
+python main.py --use_studio true
+
+# 或使用命令行模式(默认)
+python main.py
+```
+
+## 智能体介绍
+
+### 数据处理智能体
+
+负责与 Data-Juicer 交互,执行实际的数据处理任务。支持从自然语言描述自动推荐算子、生成配置并执行。
+
+**典型用途:**
+- **数据清洗**:去重、移除低质量样本、格式标准化
+- **多模态处理**:同时处理文本、图像、视频数据
+- **批量转换**:格式转换、数据增强、特征提取
+
+
+查看完整示例日志(from AgentScope Studio)
+
+
+
+### 代码开发智能体
+
+辅助开发自定义数据处理算子,默认使用 `qwen3-coder-480b-a35b-instruct` 模型驱动。
+
+**典型用途:**
+- **开发领域特定的过滤或转换算子**
+- **集成自有的数据处理逻辑**
+- **为特定场景扩展 Data-Juicer 能力**
+
+
+查看完整示例日志(from AgentScope Studio)
+
+
+
+## 高级功能
+
+### 算子检索
+
+DJ 智能体实现了一个智能算子检索工具,通过独立的 LLM 查询环节从 Data-Juicer 的近200个算子中快速找到最相关的算子。这是数据处理智能体和代码开发智能体能够准确运行的关键组件。
+
+我们提供了三种检索模式,可根据不同场景选用:
+
+#### 检索模式
+
+**LLM 检索 (默认)**
+- 使用 Qwen-Turbo 模型匹配最相关算子
+- 提供详细的匹配理由和相关性评分
+- 适合需要高精度匹配的场景,但消耗更多 Token
+
+**向量检索 (vector)**
+- 基于 DashScope 文本嵌入和 FAISS 相似度搜索
+- 快速且高效,适合大规模检索场景
+
+**自动模式 (auto)**
+- 优先尝试 LLM 检索,失败时自动降级到向量检索
+
+#### 使用
+
+通过 `-r` 或 `--retrieve_mode` 参数指定检索模式:
+
+```bash
+python main.py --retrieve_mode vector
+```
+
+更多参数说明见 `python main.py --help`
+
+### MCP 智能体
+
+Data-Juicer 提供了 MCP (Model Context Protocol) 服务,可直接通过原生接口获取算子信息、执行数据处理,易于迁移和集成,无需单独的 LLM 查询和命令行调用。
+
+#### MCP 服务器类型
+
+Data-Juicer 提供两种 MCP 服务器模式:
+
+**Recipe-Flow(数据菜谱)**
+- 根据算子类型和标签进行筛选
+- 支持将多个算子组合成数据菜谱运行
+
+**Granular-Operators(细粒度算子)**
+- 将每个算子作为独立工具提供
+- 通过环境变量灵活指定算子列表
+- 构建完全定制化的数据处理管道
+
+详细信息请参考:[Data-Juicer MCP 服务文档](https://modelscope.github.io/data-juicer/en/main/docs/DJ_service.html#mcp-server)
+
+> **注意**:Data-Juicer MCP 服务器目前处于早期开发阶段,功能和工具可能会随着持续开发而变化。
+
+#### 配置
+
+在 `configs/mcp_config.json` 中配置服务地址:
+
+```json
+{
+ "mcpServers": {
+ "DJ_recipe_flow": {
+ "url": "http://127.0.0.1:8080/sse"
+ }
+ }
+}
+```
+
+#### 使用方法
+
+启用 MCP 智能体替代 DJ 智能体:
+
+```bash
+# 启用 MCP 智能体和开发智能体
+python main.py --available_agents [dj_mcp, dj_dev]
+
+# 或使用简写
+python main.py -a [dj_mcp, dj_dev]
+```
+
+
+## 功能预览
+
+Data-Juicer 智能体生态系统正在快速扩展,以下是当前正在开发或计划中的新智能体:
+
+### Data-Juicer 问答智能体 (演示可用)
+
+为用户提供关于 Data-Juicer 算子、概念和最佳实践的详细解答。
+
+
+
+### 数据分析与可视化智能体 (开发中)
+
+生成数据分析和可视化结果,预计近期发布。
+
+### 常见问题
+
+**Q: 如何获取 DashScope API 密钥?**
+A: 访问 [DashScope 官网](https://dashscope.aliyun.com/) 注册账号并申请 API 密钥。
+
+**Q: 为什么算子检索失败?**
+A: 请检查网络连接和 API 密钥配置,或尝试切换到向量检索模式。
+
+**Q: 如何调试自定义算子?**
+A: 确保 Data-Juicer 路径配置正确,并查看代码开发智能体提供的示例代码。
+
+**Q: MCP 服务连接失败怎么办?**
+A: 检查 MCP 服务器是否正在运行,确认配置文件中的 URL 地址正确。
+
+### 优化建议
+
+- 对于大规模数据处理,建议使用DataJuicer提供的分布式模式
+- 合理设置批处理大小以平衡内存使用和处理速度
+- 更多进阶数据处理(合成、Data-Model Co-Development)等特性能力请参考DataJuicer[文档页](https://modelscope.github.io/data-juicer/zh_CN/main/index_ZH)
+
+
+---
+
+**贡献指南**:欢迎提交 Issue 和 Pull Request 来改进agentscope、DataJuicer Agent及[DataJuicer](https://modelscope.github.io/data-juicer/zh_CN/main/index_ZH#id4)。如果您在使用过程中遇到问题或有功能建议,请随时联系我们。
diff --git a/data_juicer_agent/agent_factory.py b/data_juicer_agent/agent_factory.py
new file mode 100644
index 0000000..830bfed
--- /dev/null
+++ b/data_juicer_agent/agent_factory.py
@@ -0,0 +1,92 @@
+# -*- coding: utf-8 -*-
+"""
+Agent Factory
+
+Factory functions for creating and configuring agents with standardized toolkits.
+"""
+
+import os
+from typing import Optional
+from agentscope.agent import ReActAgent
+from agentscope.tool import Toolkit
+from agentscope.formatter import FormatterBase, OpenAIChatFormatter
+from agentscope.model import ChatModelBase, OpenAIChatModel
+from agentscope.memory import InMemoryMemory, MemoryBase
+
+
+# Default configurations
+DEFAULT_MODEL_CONFIG = {
+ "model_name": "gpt-4o",
+ "stream": False,
+}
+
+
+def get_default_model() -> OpenAIChatModel:
+ """Create default OpenAI model instance."""
+ api_key = os.environ.get("OPENAI_API_KEY")
+ if not api_key:
+ raise ValueError("OPENAI_API_KEY environment variable is required")
+
+ return OpenAIChatModel(api_key=api_key, **DEFAULT_MODEL_CONFIG)
+
+
+def create_agent(
+ name: str,
+ sys_prompt: str,
+ toolkit: Toolkit,
+ description: Optional[str] = None,
+ model: Optional[ChatModelBase] = None,
+ formatter: Optional[FormatterBase] = None,
+ memory: Optional[MemoryBase] = None,
+ max_iters: int = 10,
+ parallel_tool_calls: bool = False,
+ **kwargs,
+) -> ReActAgent:
+ """
+ Create a ReActAgent with standardized configuration.
+
+ Args:
+ name: Agent identifier
+ sys_prompt: System prompt template (supports {name} placeholder)
+ toolkit: Toolkit instance
+ model: Language model (defaults to GPT-4o)
+ formatter: Message formatter (defaults to OpenAIChatFormatter)
+ memory: Memory instance (defaults to InMemoryMemory)
+ max_iters: Maximum reasoning iterations
+ parallel_tool_calls: Enable parallel tool execution
+ **kwargs: Additional ReActAgent arguments
+
+ Returns:
+ Configured ReActAgent instance
+
+ Example:
+ >>> agent = create_agent(
+ ... name="sql_expert",
+ ... sys_prompt="You are {name}, a SQL database expert",
+ ... tools=sql_tools
+ ... )
+ """
+ # Set defaults
+ if model is None:
+ model = get_default_model()
+ if formatter is None:
+ formatter = OpenAIChatFormatter()
+ if memory is None:
+ memory = InMemoryMemory()
+
+ # Create agent
+ agent = ReActAgent(
+ name=name,
+ sys_prompt=sys_prompt.format(name=name),
+ model=model,
+ formatter=formatter,
+ toolkit=toolkit,
+ memory=memory,
+ max_iters=max_iters,
+ parallel_tool_calls=parallel_tool_calls,
+ **kwargs,
+ )
+
+ agent.__doc__ = description
+
+ return agent
diff --git a/data_juicer_agent/assets/dj_agent_image.png b/data_juicer_agent/assets/dj_agent_image.png
new file mode 100644
index 0000000..d8fb2d0
Binary files /dev/null and b/data_juicer_agent/assets/dj_agent_image.png differ
diff --git a/data_juicer_agent/assets/dj_dev_agent_image.png b/data_juicer_agent/assets/dj_dev_agent_image.png
new file mode 100644
index 0000000..48ea8c6
Binary files /dev/null and b/data_juicer_agent/assets/dj_dev_agent_image.png differ
diff --git a/data_juicer_agent/configs/mcp_config.json b/data_juicer_agent/configs/mcp_config.json
new file mode 100644
index 0000000..f8cdb62
--- /dev/null
+++ b/data_juicer_agent/configs/mcp_config.json
@@ -0,0 +1,7 @@
+{
+ "mcpServers": {
+ "DJ_recipe_flow": {
+ "url": "http://127.0.0.1:8080/sse"
+ }
+ }
+}
\ No newline at end of file
diff --git a/data_juicer_agent/data/demo-dataset-images.jsonl b/data_juicer_agent/data/demo-dataset-images.jsonl
new file mode 100644
index 0000000..c91f716
--- /dev/null
+++ b/data_juicer_agent/data/demo-dataset-images.jsonl
@@ -0,0 +1,3 @@
+{"images":["./images/img1.png"], "text": "<__dj__image> A comfortable bed."}
+{"images":["./images/img2.jpg"], "text": "<__dj__image> A bus."}
+{"images":["./images/img3.jpg"], "text": "<__dj__image> Black and white photograph of a woman holding an umbrella."}
diff --git a/data_juicer_agent/data/images/img1.png b/data_juicer_agent/data/images/img1.png
new file mode 100644
index 0000000..8d9e70b
Binary files /dev/null and b/data_juicer_agent/data/images/img1.png differ
diff --git a/data_juicer_agent/data/images/img2.jpg b/data_juicer_agent/data/images/img2.jpg
new file mode 100644
index 0000000..8595513
Binary files /dev/null and b/data_juicer_agent/data/images/img2.jpg differ
diff --git a/data_juicer_agent/data/images/img3.jpg b/data_juicer_agent/data/images/img3.jpg
new file mode 100644
index 0000000..e0de8b1
Binary files /dev/null and b/data_juicer_agent/data/images/img3.jpg differ
diff --git a/data_juicer_agent/main.py b/data_juicer_agent/main.py
new file mode 100644
index 0000000..acdcf90
--- /dev/null
+++ b/data_juicer_agent/main.py
@@ -0,0 +1,154 @@
+# -*- coding: utf-8 -*-
+import os
+import fire
+from typing import List
+
+from agentscope.model import DashScopeChatModel
+from agentscope.formatter import DashScopeChatFormatter
+from agentscope.memory import InMemoryMemory
+from agentscope.agent import UserAgent
+from agentscope.tool import Toolkit
+
+from agent_factory import create_agent
+from prompts import DJ_SYS_PROMPT, DJ_DEV_SYS_PROMPT, ROUTER_SYS_PROMPT, MCP_SYS_PROMPT
+from tools import dj_toolkit, dj_dev_toolkit, mcp_tools, get_mcp_toolkit, agents2toolkit
+
+# Create shared configuration
+model = DashScopeChatModel(
+ model_name="qwen-max",
+ api_key=os.environ["DASHSCOPE_API_KEY"],
+ stream=True,
+ enable_thinking=False,
+)
+
+dev_model = DashScopeChatModel(
+ model_name="qwen3-coder-480b-a35b-instruct",
+ api_key=os.environ["DASHSCOPE_API_KEY"],
+ stream=True,
+ enable_thinking=False,
+)
+
+formatter = DashScopeChatFormatter()
+memory = InMemoryMemory()
+
+user = UserAgent("User")
+
+
+async def main(
+ use_studio: bool = False,
+ available_agents: List[str] = ["dj", "dj_dev"],
+ retrieval_mode: str = "auto",
+):
+ """
+ Main function for running the agent.
+
+ :param use_studio: Whether to use agentscope studio.
+ :param available_agents: List of available agents. Options: dj, dj_dev, dj_mcp
+ :param retrieval_mode: Retrieval mode for operators. Options: auto, vector, llm
+ """
+
+ if "dj" in available_agents:
+ # Set global retrieval mode for tools to use
+ os.environ["RETRIEVAL_MODE"] = retrieval_mode
+ print(f"Using retrieval mode: {retrieval_mode}")
+
+ agents = []
+ for agent_name in available_agents:
+ if agent_name == "dj":
+ # Create agents using unified create_agent function
+ dj_agent = create_agent(
+ "datajuicer_agent",
+ DJ_SYS_PROMPT,
+ dj_toolkit,
+ (
+ "A professional data preprocessing AI assistant with the following core capabilities: \n"
+ "Tool Matching \n"
+ "- Query and validate suitable DataJuicer operators; \n"
+ "Configuration Generation \n"
+ "- Create YAML configuration files and preview data; \n"
+ "Task Execution - Run data processing pipelines and output results"
+ ),
+ model,
+ formatter,
+ memory,
+ )
+ agents.append(dj_agent)
+
+ if agent_name == "dj_dev":
+ # DJ Development Agent for operator development
+ dj_dev_agent = create_agent(
+ "dj_dev_agent",
+ DJ_DEV_SYS_PROMPT,
+ dj_dev_toolkit,
+ (
+ "An expert DataJuicer development assistant specializing in creating new DataJuicer operators. \n"
+ "Core capabilities: \n"
+ "Reference Retrieval - fetch base classes and examples; \n"
+ "Environment Configuration - handle DATA_JUICER_PATH setup. if user provides a DataJuicer path requiring setup/update, please call this agent;\n; "
+ "Code Generation - write complete, convention-compliant operator code"
+ ),
+ dev_model,
+ formatter,
+ memory,
+ )
+ agents.append(dj_dev_agent)
+
+ if agent_name == "dj_mcp":
+ mcp_toolkit, _ = await get_mcp_toolkit()
+ for tool in mcp_tools:
+ mcp_toolkit.register_tool_function(tool)
+
+ mcp_agent = create_agent(
+ "mcp_datajuicer_agent",
+ MCP_SYS_PROMPT,
+ mcp_toolkit,
+ (
+ "DataJuicer MCP Agent powered by Recipe Flow MCP server. \n"
+ "Core capabilities: \n"
+ "- Filter operators by tags/categories using MCP protocol; \n"
+ "- Real-time data processing pipeline execution. \n"
+ ),
+ model,
+ formatter,
+ memory,
+ )
+ agents.append(mcp_agent)
+
+ # Router agent - uses agents2tools to dynamically generate tools from all agents
+ router_agent = create_agent(
+ "Router",
+ ROUTER_SYS_PROMPT,
+ agents2toolkit(agents),
+ "A router agent that intelligently routes tasks to specialized DataJuicer agents",
+ model,
+ formatter,
+ InMemoryMemory(), # Router uses its own memory instance
+ )
+
+ if use_studio is True:
+ import agentscope
+
+ agentscope.init(
+ studio_url="http://localhost:3000",
+ project="data_agent",
+ )
+
+ msg = None
+ while True:
+ msg = await user(msg)
+ if msg.get_text_content() == "exit":
+ break
+ # Router agent handles the entire task with automatic multi-step routing
+ msg = await router_agent(msg)
+
+
+if __name__ == "__main__":
+ # Example tasks
+ # project_root = os.path.abspath(os.path.dirname(__file__))
+ # task = f"数据存储在{project_root}/data/demo-dataset-images.jsonl,筛选掉样本中,文本字段长度小于5的样本,以及图片size小于100Kb的样本。并将输出结果保存到./outputs路径下。"
+ #
+ # DJ Development example task:
+ # task = "我想开发一个新的DataJuicer过滤算子,用于过滤掉没有人声的音频文件"
+ #
+ # MCP Agent will be automatically selected for advanced processing tasks
+ fire.Fire(main)
diff --git a/data_juicer_agent/prompts.py b/data_juicer_agent/prompts.py
new file mode 100644
index 0000000..6def4cf
--- /dev/null
+++ b/data_juicer_agent/prompts.py
@@ -0,0 +1,135 @@
+# -*- coding: utf-8 -*-
+
+DJ_SYS_PROMPT = """
+You are an expert data preprocessing assistant named {name}, specializing in handling multimodal data including text, images, videos, and other AI model-related data.
+
+You will strictly follow these steps sequentially:
+
+- Data Preview (optional but recommended):
+ Before generating the YAML, you may first use `view_text_file` to inspect a small subset of the raw data (e.g., the first 5–10 samples) so that you can:
+ 1. Verify the exact field names and formats;
+ 2. Decide appropriate values such as `text_keys`, `image_key`, and the parameters of subsequent operators.
+ If the user requests or needs more specific data analysis, use `dj-analyzer` to analyze the data:
+ 1. After creating the configuration file according to the requirements, run it (see Step 2 for the configuration file creation method):
+ dj-analyze --config configs/your_analyzer.yaml
+ 2. you can also use auto mode to avoid writing a recipe. It will analyze a small part (e.g. 1000 samples, specified by argument `auto_num`) of your dataset with all Filters that produce stats.
+ dj-analyze --auto --dataset_path xx.jsonl [--auto_num 1000]
+
+Step 1: Tool Discovery and Matching
+ - First, use the `query_dj_operators` tool to get relevant DataJuicer operators based on the user's task description
+ - Analyze the retrieved operators and verify if they have exact functional matches with the input query
+ - If no suitable operators are found, immediately terminate the task
+ - If partially supported operators exist, skip incompatible parts and proceed
+
+Step 2: Generate Configuration File
+ - Create a YAML configuration containing global parameters and tool configurations. Save it to a YAML file with yaml dump api.
+ After successful file creation, inform the user of the file location. File save failure indicates task failure.
+ a. Global Parameters:
+ - project_name: Project name
+ - dataset_path: Real data path (never fabricate paths. Set to `None` if unknown)
+ - export_path: Output path (use default if unspecified)
+ - text_keys: Text field names to process
+ - image_key: Image field name to process
+ - np: Multiprocessing count
+ Keep other parameters as defaults.
+
+ b. Operator Configuration:
+ - Use the operators retrieved from Step 1 to configure the 'process' field
+ - Ensure precise functional matching with user requirements
+
+Step 3: Execute Processing Task
+ Pre-execution checks:
+ - dataset_path: Must be a valid user-provided path and the path must exist
+ - process: Operator configuration list must exist
+ Terminate immediately if any check fails and explain why.
+
+ If all pre-execution checks are valid, run: `dj-process --config ${{YAML_config_file}}`
+
+Mandatory Requirements:
+- Never ask me questions. Make reasonable assumptions for non-critical parameters
+- Only generate the reply after the task has finished running
+- Always start by retrieving relevant operators using the query_dj_operators tool
+
+Configuration Template:
+```yaml
+# global parameters
+project_name: {{your project name}}
+dataset_path: {{path to your dataset directory or file}}
+text_keys: {{text key to be processed}}
+image_key: {{image key to be processed}}
+np: {{number of subprocess to process your dataset}}
+skip_op_error: false # must set to false
+
+export_path: {{single file path to save processed data, must be a jsonl file path not a folder}}
+
+# process schedule
+# a list of several process operators with their arguments
+process:
+ - image_shape_filter:
+ min_width: 100
+ min_height: 100
+ - text_length_filter:
+ min_len: 5
+ max_len: 10000
+ - ...
+```
+
+Available Tools:
+Function definitions:
+```
+{{index}}. {{function name}}: {{function description}}
+{{argument1 name}} ({{argument type}}): {{argument description}}
+{{argument2 name}} ({{argument type}}): {{argument description}}
+```
+
+"""
+
+DJ_DEV_SYS_PROMPT = """
+You are an expert DataJuicer operator development assistant named {name}, specializing in helping developers create new DataJuicer operators.
+
+Development Workflow:
+1. Understand user requirements and identify operator type (filter, mapper, deduplicator, etc.)
+2. Call `get_basic_files()` to get base_op classes and development guidelines
+3. Call `get_operator_example(operator_type)` to get relevant examples
+4. If previous tools report `DATA_JUICER_PATH` not configured, **STOP** and request user input with a clear message asking for the value of `DATA_JUICER_PATH`
+5. Once the user provides `DATA_JUICER_PATH`, call `configure_data_juicer_path(data_juicer_path)` with the provided value
+ **Do not attempt to set or infer `DATA_JUICER_PATH` on your own**
+
+Critical Requirements:
+- NEVER guess or fabricate file paths or configuration values
+- Always call get_basic_files() and get_operator_example() before writing code
+- Write complete, runnable code following DataJuicer conventions
+- Focus on practical implementation
+"""
+
+MCP_SYS_PROMPT = """You are {name}, an advanced DataJuicer MCP Agent powered by MCP server, specializing in handling multimodal data including text, images, videos, and other AI model-related data.
+
+Analyze user requirements and use the tools provided to you for data processing.
+
+Before data processing, you can also try:
+- Use `view_text_file` to inspect a small subset of the raw data (e.g., the first 2~5 samples) in order to:
+ 1. Verify the exact field names and formats
+ 2. Determine appropriate parameter values such as text length ranges, language types, confidence thresholds, etc.
+ 3. Understand data characteristics to optimize operator parameter configuration
+"""
+
+ROUTER_SYS_PROMPT = """
+You are an AI routing agent named {name}. Your primary responsibility is to analyze user queries and route them to the most appropriate specialized agent for handling.
+
+Key responsibilities:
+1. Understand the user's intent and requirements
+2. Select the most suitable agent from available options
+3. Handle user input requests from routed agents properly
+
+When routing to an agent that requires user input:
+- If the routed agent returns a response indicating that additional input or configuration is required for user confirmation or submission, you must:
+ 1. Stop the current routing process
+ 2. Present the agent's request to the user directly
+ 3. Wait for user's response before continuing
+ 4. Pass the user's input back to the appropriate agent
+
+- NEVER fabricate or guess user input values (like paths, configurations, etc.)
+- Always ask the user for the required information when an agent needs it
+
+Available agents and their capabilities will be provided as tools in your toolkit.
+"""
\ No newline at end of file
diff --git a/data_juicer_agent/pyproject.toml b/data_juicer_agent/pyproject.toml
new file mode 100644
index 0000000..fd0659f
--- /dev/null
+++ b/data_juicer_agent/pyproject.toml
@@ -0,0 +1,12 @@
+[project]
+name = "data-juicer-agent"
+version = "0.1.0"
+description = "A data processing agent with data juicer"
+readme = "README.md"
+requires-python = ">=3.11"
+dependencies = [
+ "agentscope>=1.0.5",
+ "faiss-cpu>=1.12.0",
+ "langchain-community",
+ "py-data-juicer>=1.4.2",
+]
diff --git a/data_juicer_agent/tools/__init__.py b/data_juicer_agent/tools/__init__.py
new file mode 100644
index 0000000..ecf012a
--- /dev/null
+++ b/data_juicer_agent/tools/__init__.py
@@ -0,0 +1,89 @@
+# -*- coding: utf-8 -*-
+"""
+Tools package for data-agent.
+
+This module provides a unified entry point for all agent tools,
+organized by agent type for easy access and management.
+"""
+import asyncio
+from typing import List
+from agentscope.agent import AgentBase
+from agentscope.tool import (
+ view_text_file,
+ write_text_file,
+)
+from agentscope.tool import Toolkit
+
+from .dj_tools import execute_safe_command
+from .router_tools import agent_to_tool
+from .dj_tools import query_dj_operators
+from .dj_dev_tools import get_basic_files, get_operator_example, configure_data_juicer_path
+from .mcp_tools import get_mcp_toolkit
+
+def create_toolkit(tools: List[str]):
+ # Create toolkit and register tools
+ toolkit = Toolkit()
+ for tool in tools:
+ toolkit.register_tool_function(tool)
+
+ return toolkit
+
+# DJ Agent tools
+dj_tools = [
+ execute_safe_command,
+ view_text_file,
+ write_text_file,
+ query_dj_operators,
+]
+
+# DJ Development Agent tools - for developing DataJuicer operators
+dj_dev_tools = [
+ view_text_file,
+ write_text_file,
+ get_basic_files,
+ get_operator_example,
+ configure_data_juicer_path,
+]
+
+# MCP Agent tools - for advanced data processing with Recipe Flow MCP
+mcp_tools = [
+ view_text_file,
+ write_text_file,
+]
+
+def agents2toolkit(agents: List[AgentBase]):
+ tools = [agent_to_tool(agent) for agent in agents]
+ return create_toolkit(tools)
+
+dj_toolkit = create_toolkit(dj_tools)
+dj_dev_toolkit = create_toolkit(dj_dev_tools)
+
+
+# All available tools
+all_toolkit = {
+ "dj": dj_toolkit,
+ "dj_dev": dj_dev_toolkit,
+ "dj_mcp": get_mcp_toolkit,
+ "router": agents2toolkit,
+}
+
+# Public API
+__all__ = [
+ "dj_tools",
+ "dj_dev_tools",
+ "mcp_tools",
+ "all_tools",
+ "agents2toolkit",
+ "dj_toolkit",
+ "dj_dev_toolkit",
+ "get_mcp_toolkit",
+ # Individual tools for direct import
+ "execute_safe_command",
+ "view_text_file",
+ "write_text_file",
+ "agent_to_tool",
+ "query_dj_operators",
+ "get_basic_files",
+ "get_operator_example",
+ "configure_data_juicer_path",
+]
\ No newline at end of file
diff --git a/data_juicer_agent/tools/dj_dev_tools.py b/data_juicer_agent/tools/dj_dev_tools.py
new file mode 100644
index 0000000..b7681e8
--- /dev/null
+++ b/data_juicer_agent/tools/dj_dev_tools.py
@@ -0,0 +1,235 @@
+# -*- coding: utf-8 -*-
+"""
+DataJuicer Development Tools
+
+Tools for developing DataJuicer operators, including access to basic documentation
+and example code for different operator types.
+"""
+
+import os
+from pathlib import Path
+from agentscope.message import TextBlock
+from agentscope.tool import ToolResponse
+
+# DataJuicer home path - should be configured based on your environment
+DATA_JUICER_PATH = os.getenv("DATA_JUICER_PATH", None)
+
+BASIC_LIST_RELATIVE = [
+ "data_juicer/ops/base_op.py",
+ "docs/DeveloperGuide.md",
+ "docs/DeveloperGuide_ZH.md",
+]
+
+
+def get_basic_files() -> ToolResponse:
+ """Get basic DataJuicer development files content.
+
+ Returns the content of essential files needed for DJ operator development:
+ - base_op.py: Base operator class
+ - DeveloperGuide.md: English developer guide
+ - DeveloperGuide_ZH.md: Chinese developer guide
+
+ Returns:
+ ToolResponse: Combined content of all basic development files
+ """
+ global DATA_JUICER_PATH, BASIC_LIST_RELATIVE
+ if DATA_JUICER_PATH is None:
+ return ToolResponse(
+ content=[
+ TextBlock(
+ type="text",
+ text="DATA_JUICER_PATH is not configured. Please ask the user to provide the DATA_JUICER_PATH",
+ )
+ ]
+ )
+
+ try:
+ combined_content = "# DataJuicer Operator Development Basic Files\n\n"
+
+ for relative_path in BASIC_LIST_RELATIVE:
+ file_path = os.path.join(DATA_JUICER_PATH, relative_path)
+ if os.path.exists(file_path):
+ try:
+ with open(file_path, "r", encoding="utf-8") as f:
+ content = f.read()
+
+ filename = os.path.basename(file_path)
+ combined_content += f"## {filename}\n\n"
+ combined_content += (
+ f"```{'python' if filename.endswith('.py') else 'markdown'}\n"
+ )
+ combined_content += content
+ combined_content += "\n```\n\n"
+ except Exception as e:
+ combined_content += (
+ f"## {os.path.basename(file_path)} (Read Failed)\n"
+ )
+ combined_content += f"Error: {str(e)}\n\n"
+
+ return ToolResponse(content=[TextBlock(type="text", text=combined_content)])
+
+ except Exception as e:
+ return ToolResponse(
+ content=[
+ TextBlock(
+ type="text",
+ text=f"Error occurred while getting basic files: {str(e)}",
+ )
+ ]
+ )
+
+
+async def get_operator_example(
+ requirement_description: str, limit: int = 2
+) -> ToolResponse:
+ """Get example operators based on requirement description using dynamic search.
+
+ Args:
+ requirement_description (str): Natural language description of the operator requirement
+ limit (int): Maximum number of example operators to return (default: 2)
+
+ Returns:
+ ToolResponse: Example operator code and test files based on the requirement
+ """
+ global DATA_JUICER_PATH
+ if DATA_JUICER_PATH is None:
+ return ToolResponse(
+ content=[
+ TextBlock(
+ type="text",
+ text="DATA_JUICER_PATH is not configured. Please ask the user to provide the DATA_JUICER_PATH",
+ )
+ ]
+ )
+
+ try:
+ # Import retrieve_ops from op_manager
+ from .op_manager.op_retrieval import retrieve_ops
+
+ # Query relevant operators using the requirement description
+ # Use retrieval mode from environment variable if set
+ retrieval_mode = os.environ.get("RETRIEVAL_MODE", "auto")
+ tool_names = await retrieve_ops(requirement_description, limit=limit, mode=retrieval_mode)
+
+ if not tool_names:
+ return ToolResponse(
+ content=[
+ TextBlock(
+ type="text",
+ text=f"No relevant operators found for requirement: {requirement_description}\n"
+ f"Please try with more specific keywords or check if DATA_JUICER_PATH is properly configured.",
+ )
+ ]
+ )
+
+ combined_content = (
+ f"# Dynamic Operator Examples for: {requirement_description}\n\n"
+ )
+ combined_content += (
+ f"Found {len(tool_names)} relevant operators (limit: {limit})\n\n"
+ )
+
+ # Process each found operator
+ for i, tool_name in enumerate(tool_names[:limit]):
+ combined_content += f"## {i+1}. {tool_name}\n\n"
+
+ op_type = tool_name.split("_")[-1]
+
+ operator_path = f"data_juicer/ops/{op_type}/{tool_name}.py"
+
+ # Try to find operator source file
+
+ full_path = os.path.join(DATA_JUICER_PATH, operator_path)
+ if os.path.exists(full_path):
+ with open(full_path, "r", encoding="utf-8") as f:
+ operator_code = f.read()
+
+ combined_content += f"### Source Code\n"
+ combined_content += "```python\n"
+ combined_content += operator_code
+ combined_content += "\n```\n\n"
+ else:
+ combined_content += (
+ f"**Note:** Source code file not found for `{tool_name}`.\n\n"
+ )
+
+ test_path = f"tests/ops/{op_type}/test_{tool_name}.py"
+
+ full_test_path = os.path.join(DATA_JUICER_PATH, test_path)
+ if os.path.exists(full_test_path):
+ with open(full_test_path, "r", encoding="utf-8") as f:
+ test_code = f.read()
+
+ combined_content += f"### Test Code\n"
+ combined_content += f"**File Path:** `{test_path}`\n\n"
+ combined_content += "```python\n"
+ combined_content += test_code
+ combined_content += "\n```\n\n"
+
+ else:
+ combined_content += (
+ f"**Note:** Test file not found for `{tool_name}`.\n\n"
+ )
+
+ combined_content += "---\n\n"
+
+ return ToolResponse(content=[TextBlock(type="text", text=combined_content)])
+
+ except Exception as e:
+ return ToolResponse(
+ content=[
+ TextBlock(
+ type="text",
+ text=f"Error occurred while getting operator examples: {str(e)}\n"
+ f"Please check the requirement description and try again.",
+ )
+ ]
+ )
+
+
+def configure_data_juicer_path(data_juicer_path: str) -> ToolResponse:
+ """Configure DataJuicer path.
+ If the user provides the data_juicer_path, please use this method to configure it.
+
+ Args:
+ data_juicer_path (str): Path to DataJuicer installation
+
+ Returns:
+ ToolResponse: Configuration result
+ """
+ global DATA_JUICER_PATH
+
+ data_juicer_path = os.path.expanduser(data_juicer_path)
+
+ try:
+ if not os.path.exists(data_juicer_path):
+ return ToolResponse(
+ content=[
+ TextBlock(
+ type="text",
+ text=f"Specified DataJuicer path does not exist: {data_juicer_path}",
+ )
+ ]
+ )
+
+ # Update global DATA_JUICER_PATH
+ DATA_JUICER_PATH = data_juicer_path
+
+ return ToolResponse(
+ content=[
+ TextBlock(
+ type="text",
+ text=f"DataJuicer path has been updated to: {DATA_JUICER_PATH}",
+ )
+ ]
+ )
+
+ except Exception as e:
+ return ToolResponse(
+ content=[
+ TextBlock(
+ type="text",
+ text=f"Error occurred while configuring DataJuicer path: {str(e)}",
+ )
+ ]
+ )
diff --git a/data_juicer_agent/tools/dj_tools.py b/data_juicer_agent/tools/dj_tools.py
new file mode 100644
index 0000000..9cd5e76
--- /dev/null
+++ b/data_juicer_agent/tools/dj_tools.py
@@ -0,0 +1,224 @@
+import os
+import os.path as osp
+import json
+import asyncio
+from typing import Any
+from agentscope.message import TextBlock
+from agentscope.tool import ToolResponse
+from .op_manager.op_retrieval import retrieve_ops
+
+# Load tool information for formatting
+TOOLS_INFO_PATH = osp.join(osp.dirname(__file__), "op_manager", "dj_funcs_all.json")
+
+def _load_tools_info():
+ """Load tools information from JSON file or create it if not exists"""
+ if osp.exists(TOOLS_INFO_PATH):
+ with open(TOOLS_INFO_PATH, "r", encoding="utf-8") as f:
+ return json.loads(f.read())
+ else:
+ from .op_manager.create_dj_func_info import dj_func_info
+ with open(TOOLS_INFO_PATH, "w", encoding="utf-8") as f:
+ json.dump(dj_func_info, f)
+ return dj_func_info
+
+def _format_tool_names_to_class_entries(tool_names):
+ """Convert tool names list to formatted class entries string"""
+ if not tool_names:
+ return ""
+
+ tools_info = _load_tools_info()
+
+ # Create a mapping from class_name to tool info for quick lookup
+ tools_map = {tool['class_name']: tool for tool in tools_info}
+
+ formatted_entries = []
+ for i, tool_name in enumerate(tool_names):
+ if tool_name in tools_map:
+ tool_info = tools_map[tool_name]
+ class_entry = f"{i+1}. {tool_info['class_name']}: {tool_info['class_desc']}"
+ class_entry += "\n" + tool_info["arguments"]
+ formatted_entries.append(class_entry)
+
+ return "\n".join(formatted_entries)
+
+async def query_dj_operators(query: str, limit: int = 20) -> ToolResponse:
+ """Query DataJuicer operators by natural language description.
+
+ Retrieves relevant operators from DataJuicer library based on user query.
+ Supports matching by functionality, data type, and processing scenarios.
+
+ Args:
+ query (str): Natural language operator query
+ limit (int): Maximum number of operators to return (default: 20)
+
+ Returns:
+ ToolResponse: Tool response containing matched operators with names, descriptions, and parameters
+ """
+
+ try:
+ # Retrieve operator names using existing functionality with limit
+ # Use retrieval mode from environment variable if set
+ retrieval_mode = os.environ.get("RETRIEVAL_MODE", "auto")
+ tool_names = await retrieve_ops(query, limit=limit, mode=retrieval_mode)
+
+ if not tool_names:
+ return ToolResponse(
+ content=[
+ TextBlock(
+ type="text",
+ text=f"No matching DataJuicer operators found for query: {query}\n"
+ f"Suggestions:\n"
+ f"1. Use more specific keywords like 'text filter', 'image processing'\n"
+ f"2. Check spelling and try alternative terms\n"
+ f"3. Try English keywords for better matching",
+ )
+ ],
+ )
+
+ # Format tool names to class entries
+ retrieved_operators = _format_tool_names_to_class_entries(tool_names)
+
+ # Format response
+ result_text = f"🔍 DataJuicer Operator Query Results\n"
+ result_text += f"Query: {query}\n"
+ result_text += f"Limit: {limit} operators\n"
+ result_text += f"{'='*50}\n\n"
+ result_text += retrieved_operators
+
+ return ToolResponse(
+ content=[
+ TextBlock(
+ type="text",
+ text=result_text,
+ )
+ ],
+ )
+
+ except Exception as e:
+ return ToolResponse(
+ content=[
+ TextBlock(
+ type="text",
+ text=f"Error querying DataJuicer operators: {str(e)}\n"
+ f"Please verify query parameters and retry.",
+ )
+ ],
+ )
+
+
+async def execute_safe_command(
+ command: str,
+ timeout: int = 300,
+ **kwargs: Any,
+) -> ToolResponse:
+ """Execute safe commands including DataJuicer commands and other safe system commands.
+ Returns the return code, standard output and error within ,
+ and tags.
+
+ Args:
+ command (`str`):
+ The command to execute. Allowed commands include:
+ - DataJuicer commands: dj-process, dj-analyze
+ - File system commands: mkdir, ls, pwd, cat, echo, cp, mv, rm
+ - Text processing: grep, head, tail, wc, sort, uniq
+ - Archive commands: tar, zip, unzip
+ - Other safe commands: which, whoami, date, find
+ timeout (`float`, defaults to `300`):
+ The maximum time (in seconds) allowed for the command to run.
+
+ Returns:
+ `ToolResponse`:
+ The tool response containing the return code, standard output, and
+ standard error of the executed command.
+ """
+
+ # Security check: only allow safe commands
+ command_stripped = command.strip()
+
+ # Define allowed command prefixes for security
+ allowed_commands = [
+ # DataJuicer commands
+ 'dj-process', 'dj-analyze',
+ # File system operations
+ 'mkdir', 'ls', 'pwd', 'cat', 'echo', 'cp', 'mv', 'rm',
+ # Text processing
+ 'grep', 'head', 'tail', 'wc', 'sort', 'uniq',
+ # Archive operations
+ 'tar', 'zip', 'unzip',
+ # Information commands
+ 'which', 'whoami', 'date', 'find',
+ # Python commands
+ 'python', 'python3', 'pip', 'uv'
+ ]
+
+ # Check if command starts with any allowed command
+ command_allowed = False
+ for allowed_cmd in allowed_commands:
+ if command_stripped.startswith(allowed_cmd):
+ # Additional security checks for potentially dangerous commands
+ if allowed_cmd in ['rm', 'mv'] and ('/' in command_stripped or '..' in command_stripped):
+ # Prevent dangerous path operations
+ continue
+ command_allowed = True
+ break
+
+ if not command_allowed:
+ error_msg = f"Error: Command not allowed for security reasons. Allowed commands: {', '.join(allowed_commands)}. Received command: {command}"
+ return ToolResponse(
+ content=[
+ TextBlock(
+ type="text",
+ text=(
+ f"-1"
+ f""
+ f"{error_msg}"
+ ),
+ ),
+ ],
+ )
+
+ proc = await asyncio.create_subprocess_shell(
+ command,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
+ bufsize=0,
+ )
+
+ try:
+ await asyncio.wait_for(proc.wait(), timeout=timeout)
+ stdout, stderr = await proc.communicate()
+ stdout_str = stdout.decode("utf-8")
+ stderr_str = stderr.decode("utf-8")
+ returncode = proc.returncode
+
+ except asyncio.TimeoutError:
+ stderr_suffix = (
+ f"TimeoutError: The command execution exceeded "
+ f"the timeout of {timeout} seconds."
+ )
+ returncode = -1
+ try:
+ proc.terminate()
+ stdout, stderr = await proc.communicate()
+ stdout_str = stdout.decode("utf-8")
+ stderr_str = stderr.decode("utf-8")
+ if stderr_str:
+ stderr_str += f"\n{stderr_suffix}"
+ else:
+ stderr_str = stderr_suffix
+ except ProcessLookupError:
+ stdout_str = ""
+ stderr_str = stderr_suffix
+
+ return ToolResponse(
+ content=[
+ TextBlock(
+ type="text",
+ text=(
+ f"{returncode}"
+ f"{stdout_str}"
+ f"{stderr_str}"
+ ),
+ ),
+ ],
+ )
\ No newline at end of file
diff --git a/data_juicer_agent/tools/mcp_tools.py b/data_juicer_agent/tools/mcp_tools.py
new file mode 100644
index 0000000..c441134
--- /dev/null
+++ b/data_juicer_agent/tools/mcp_tools.py
@@ -0,0 +1,120 @@
+import json
+import os
+import logging
+from typing import Optional, List
+import string
+
+from agentscope.tool import Toolkit
+from agentscope.mcp import HttpStatefulClient, HttpStatelessClient, StdIOStatefulClient
+
+# Configure logging
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+root_path = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
+
+def _load_config(config_path: str) -> dict:
+ """Load MCP configuration from file"""
+ try:
+ if os.path.exists(config_path):
+ with open(config_path, "r", encoding="utf-8") as f:
+ config = json.load(f)
+ logger.info(f"Loaded MCP configuration from {config_path}")
+ return config
+ else:
+ logger.warning(
+ f"Configuration file {config_path} not found, using default settings"
+ )
+ return _create_default_config()
+ except Exception as e:
+ logger.error(f"Error loading configuration: {e}")
+ return _create_default_config()
+
+def _create_default_config() -> dict:
+ """Create default configuration"""
+ return {
+ "mcpServers": {
+ "dj_recipe_flow": {
+ "command": "python",
+ "args": ["/home/test/data_juicer/tools/DJ_mcp_recipe_flow.py"],
+ "env": {"SERVER_TRANSPORT": "stdio"},
+ }
+ }
+ }
+
+def _expand_env_vars(value: str) -> str:
+ """Expand environment variables in configuration values"""
+ if isinstance(value, str):
+ template = string.Template(value)
+ try:
+ return template.substitute(os.environ)
+ except KeyError as e:
+ logger.warning(f"Environment variable not found: {e}")
+ return value
+ return value
+
+async def _create_clients(config: dict, toolkit: Toolkit):
+ """Create MCP clients based on configuration"""
+ server_configs = config.get("mcpServers", {})
+ clients = []
+
+ for server_name, server_config in server_configs.items():
+ try:
+ # Handle StdIO client
+ if "command" in server_config:
+ command = server_config["command"]
+ args = server_config.get("args", [])
+ env = server_config.get("env", {})
+
+ # Expand environment variables
+ expanded_args = [_expand_env_vars(arg) for arg in args]
+ expanded_env = {k: _expand_env_vars(v) for k, v in env.items()}
+
+ client = StdIOStatefulClient(
+ name=server_name,
+ command=command,
+ args=expanded_args,
+ env=expanded_env,
+ )
+
+ await client.connect()
+ await toolkit.register_mcp_client(client)
+
+ # Handle HTTP clients
+ elif "url" in server_config:
+ url = _expand_env_vars(server_config["url"])
+ transport = server_config.get("transport", "sse")
+ stateful = server_config.get("stateful", True)
+
+ if stateful:
+ client = HttpStatefulClient(
+ name=server_name, transport=transport, url=url
+ )
+ await client.connect()
+ await toolkit.register_mcp_client(client)
+ else:
+ client = HttpStatelessClient(
+ name=server_name, transport=transport, url=url
+ )
+ await toolkit.register_mcp_client(client)
+
+ else:
+ raise ValueError("Invalid server configuration")
+
+ clients.append(client)
+ except Exception as e:
+ if "Invalid server configuration" in str(e):
+ raise e
+ logger.error(f"Failed to create client {server_name}: {e}")
+
+ return clients
+
+async def get_mcp_toolkit(config_path: Optional[str] = None) -> Toolkit:
+ """Get toolkit with all MCP tools registered"""
+ config_path = config_path or root_path + "/configs/mcp_config.json"
+ config = _load_config(config_path)
+ toolkit = Toolkit()
+
+ clients = await _create_clients(config, toolkit)
+
+ return toolkit, clients
diff --git a/data_juicer_agent/tools/op_manager/__init__.py b/data_juicer_agent/tools/op_manager/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/data_juicer_agent/tools/op_manager/create_dj_func_info.py b/data_juicer_agent/tools/op_manager/create_dj_func_info.py
new file mode 100644
index 0000000..d7abc3a
--- /dev/null
+++ b/data_juicer_agent/tools/op_manager/create_dj_func_info.py
@@ -0,0 +1,34 @@
+import inspect
+from data_juicer.tools.op_search import OPSearcher
+
+searcher = OPSearcher(include_formatter=False)
+
+all_ops = searcher.search()
+
+dj_func_info = []
+for i, op in enumerate(all_ops):
+ class_entry = {"index": i, "class_name": op["name"], "class_desc": op["desc"]}
+ param_desc = op["param_desc"]
+ param_desc_map = {}
+ args = ""
+ for item in param_desc.split(":param"):
+ _item = item.split(":")
+ if len(_item) < 2:
+ continue
+ param_desc_map[_item[0].strip()] = ":".join(_item[1:]).strip()
+
+ if op["sig"]:
+ for param_name, param in op["sig"].parameters.items():
+ if param_name in ["self", "args", "kwargs"]:
+ continue
+ if param.kind in (
+ inspect.Parameter.VAR_POSITIONAL,
+ inspect.Parameter.VAR_KEYWORD,
+ ):
+ continue
+ if param_name in param_desc_map:
+ args += f" {param_name} ({param.annotation}): {param_desc_map[param_name]}\n"
+ else:
+ args += f" {param_name} ({param.annotation})\n"
+ class_entry["arguments"] = args
+ dj_func_info.append(class_entry)
diff --git a/data_juicer_agent/tools/op_manager/dj_funcs_all.json b/data_juicer_agent/tools/op_manager/dj_funcs_all.json
new file mode 100644
index 0000000..44f06f1
--- /dev/null
+++ b/data_juicer_agent/tools/op_manager/dj_funcs_all.json
@@ -0,0 +1 @@
+[{"index": 0, "class_name": "nested_aggregator", "class_desc": "Aggregates nested content from multiple samples into a single summary.\n\n This operator uses a recursive summarization approach to aggregate content from multiple\n samples. It processes the input text, which is split into sub-documents, and generates a\n summary that maintains the average length of the original documents. The aggregation is\n performed using an API model, guided by system prompts and templates. The operator\n supports retrying the API call in case of errors and allows for customization of the\n summarization process through various parameters. The default system prompt and\n templates are provided in Chinese, but they can be customized. The operator uses a\n Hugging Face tokenizer to handle tokenization.", "arguments": " api_model (): API model name.\n input_key (): The input key in the meta field of the samples.\n It is \"event_description\" in default.\n output_key (): The output key in the aggregation field in the\n samples. It is same as the input_key in default.\n max_token_num (typing.Optional[typing.Annotated[int, Gt(gt=0)]]): The max token num of the total tokens of the\n sub documents. Without limitation if it is None.\n api_endpoint (typing.Optional[str]): URL endpoint for the API.\n response_path (typing.Optional[str]): Path to extract content from the API response.\n Defaults to 'choices.0.message.content'.\n system_prompt (typing.Optional[str]): The system prompt.\n sub_doc_template (typing.Optional[str]): The template for input text in each sample.\n input_template (typing.Optional[str]): The input template.\n try_num (typing.Annotated[int, Gt(gt=0)]): The number of retry attempts when there is an API\n call error or output parsing error.\n model_params (typing.Dict): Parameters for initializing the API model.\n sampling_params (typing.Dict): Extra parameters passed to the API call.\n e.g {'temperature': 0.9, 'top_p': 0.95}\n"}, {"index": 1, "class_name": "entity_attribute_aggregator", "class_desc": "Summarizes a given attribute of an entity from a set of documents.\n\n The operator extracts and summarizes the specified attribute of a given entity from the\n provided documents. It uses a system prompt, example prompt, and input template to\n generate the summary. The output is formatted as a markdown-style summary with the\n entity and attribute clearly labeled. The summary is limited to a specified number of\n words (default is 100). The operator uses a Hugging Face tokenizer to handle token\n limits and splits documents if necessary. If the input key or required fields are\n missing, the operator logs a warning and returns the sample unchanged. The summary is\n stored in the batch metadata under the specified output key. The system prompt, input\n template, example prompt, and output pattern can be customized.", "arguments": " api_model (): API model name.\n entity (): The given entity.\n attribute (): The given attribute.\n input_key (): The input key in the meta field of the samples.\n It is \"event_description\" in default.\n output_key (): The output key in the aggregation field of the\n samples. It is \"entity_attribute\" in default.\n word_limit (typing.Annotated[int, Gt(gt=0)]): Prompt the output length.\n max_token_num (typing.Optional[typing.Annotated[int, Gt(gt=0)]]): The max token num of the total tokens of the\n sub documents. Without limitation if it is None.\n api_endpoint (typing.Optional[str]): URL endpoint for the API.\n response_path (typing.Optional[str]): Path to extract content from the API response.\n Defaults to 'choices.0.message.content'.\n system_prompt_template (typing.Optional[str]): The system prompt template.\n example_prompt (typing.Optional[str]): The example part in the system prompt.\n input_template (typing.Optional[str]): The input template.\n output_pattern_template (typing.Optional[str]): The output template.\n try_num (typing.Annotated[int, Gt(gt=0)]): The number of retry attempts when there is an API\n call error or output parsing error.\n model_params (typing.Dict): Parameters for initializing the API model.\n sampling_params (typing.Dict): Extra parameters passed to the API call.\n e.g {'temperature': 0.9, 'top_p': 0.95}\n"}, {"index": 2, "class_name": "meta_tags_aggregator", "class_desc": "Merge similar meta tags into a single, unified tag.\n\n This operator aggregates and consolidates similar meta tags from the input data. It can\n handle two scenarios:\n - When a set of target tags is provided, it maps the original tags to these predefined\n categories. If a \"miscellaneous\" or \"other\" category is included, any tags that do not\n fit into the specified categories are grouped under this label.\n - When no target tags are provided, it generates reasonable categories based on the\n similarity and frequency of the input tags.\n\n The operator uses a language model (default: gpt-4o) to analyze and merge the tags. The\n system prompt, input template, and output pattern can be customized. The aggregated tags\n are then updated in the input sample's metadata.", "arguments": " api_model (): API model name.\n meta_tag_key (): The key of the meta tag to be mapped.\n target_tags (typing.Optional[typing.List[str]]): The tags that is supposed to be mapped to.\n api_endpoint (typing.Optional[str]): URL endpoint for the API.\n response_path (typing.Optional[str]): Path to extract content from the API response.\n Defaults to 'choices.0.message.content'.\n system_prompt (typing.Optional[str]): The system prompt.\n input_template (typing.Optional[str]): The input template.\n target_tag_template (typing.Optional[str]): The tap template for target tags.\n tag_template (typing.Optional[str]): The tap template for each tag and its\n frequency.\n output_pattern (typing.Optional[str]): The output pattern.\n try_num (typing.Annotated[int, Gt(gt=0)]): The number of retry attempts when there is an API\n call error or output parsing error.\n model_params (typing.Dict): Parameters for initializing the API model.\n sampling_params (typing.Dict): Extra parameters passed to the API call.\n e.g {'temperature': 0.9, 'top_p': 0.95}\n"}, {"index": 3, "class_name": "most_relevant_entities_aggregator", "class_desc": "Extracts and ranks entities closely related to a given entity from provided texts.\n\n The operator uses a language model API to identify and rank entities, filtering out\n entities of the same type as the given entity. The ranked list is sorted in descending\n order of importance. Input texts are aggregated and passed to the model, with an\n optional token limit. The output is parsed using a regular expression to extract the\n relevant entities. Results are stored in the batch metadata under the key\n 'most_relevant_entities'. The operator retries the API call up to a specified number of\n times in case of errors. The system prompt, input template, and output pattern can be\n customized.", "arguments": " api_model (): API model name.\n entity (): The given entity.\n query_entity_type (): The type of queried relevant entities.\n input_key (): The input key in the meta field of the samples.\n It is \"event_description\" in default.\n output_key (): The output key in the aggregation field of the\n samples. It is \"most_relevant_entities\" in default.\n max_token_num (typing.Optional[typing.Annotated[int, Gt(gt=0)]]): The max token num of the total tokens of the\n sub documents. Without limitation if it is None.\n api_endpoint (typing.Optional[str]): URL endpoint for the API.\n response_path (typing.Optional[str]): Path to extract content from the API response.\n Defaults to 'choices.0.message.content'.\n system_prompt_template (typing.Optional[str]): The system prompt template.\n input_template (typing.Optional[str]): The input template.\n output_pattern (typing.Optional[str]): The output pattern.\n try_num (typing.Annotated[int, Gt(gt=0)]): The number of retry attempts when there is an API\n call error or output parsing error.\n model_params (typing.Dict): Parameters for initializing the API model.\n sampling_params (typing.Dict): Extra parameters passed to the API call.\n e.g {'temperature': 0.9, 'top_p': 0.95}\n"}, {"index": 4, "class_name": "document_deduplicator", "class_desc": "Deduplicates samples at the document level using exact matching.\n\n This operator computes an MD5 hash for each sample's text. It can optionally convert the\n text to lowercase and ignore non-alphabet characters, including whitespaces, digits, and\n punctuation. The deduplication is based on the computed hash values, where samples with\n identical hashes are considered duplicates. The `compute_hash` method adds a 'hash' key\n to each sample, storing its MD5 hash. During processing, the first occurrence of each\n unique hash is kept, and subsequent duplicates are filtered out. If the `show_num`\n parameter is set, the operator also returns a specified number of duplicate pairs for\n inspection.", "arguments": " lowercase (): Whether to convert sample text to lower case\n ignore_non_character (): Whether to ignore non-alphabet\n characters, including whitespaces, digits, and punctuations\n"}, {"index": 5, "class_name": "document_minhash_deduplicator", "class_desc": "Deduplicates samples at the document level using MinHash LSH.\n\n This operator computes MinHash values for each sample and uses Locality-Sensitive\n Hashing (LSH) to identify and remove near-duplicate documents. The Jaccard similarity\n threshold determines when two documents are considered duplicates. The tokenization\n method can be customized, and a Hugging Face tokenizer can be used for 'sentencepiece'\n tokenization. The minhash values are stored as bytes and are not kept in the final\n dataset. The number of bands and rows per band in LSH can be set manually or determined\n by an optimal parameter computation algorithm. Important notes:\n - If using 'punctuation' tokenization with an ignore pattern, ensure the pattern does\n not include punctuations.\n - For 'sentencepiece' tokenization, a tokenizer model path is required.\n - The deduplication process involves clustering and filtering, and only unique samples\n or the first sample in a cluster are retained.", "arguments": " tokenization (): tokenization method for sample texts. It\n should be one of [space, punctuation, character,\n sentencepiece]. For English-like languages, we recommend\n to use 'space', for Chinese-like languages, we recommend\n to use 'character', and for multiple languages, we recommend\n to use 'sentencepiece'. If using 'sentencepiece', please\n provided the model path in the 'tokenizer_model' field.\n window_size (typing.Annotated[int, Gt(gt=0)]): window size of shingling\n lowercase (): whether to convert text to lower case first\n ignore_pattern (typing.Optional[str]): whether to ignore sub-strings with\n specific pattern when computing minhash\n num_permutations (typing.Annotated[int, Gt(gt=0)]): number of permutations in minhash\n computing\n jaccard_threshold (typing.Annotated[float, FieldInfo(annotation=NoneType, required=True, metadata=[Ge(ge=0), Le(le=1)])]): the min jaccard similarity threshold\n in near-duplicate detection. When the jaccard similarity of\n two sample texts is >= this threshold, they are regarded as\n similar samples and this op will only keep one of them after\n deduplication\n num_bands (typing.Optional[typing.Annotated[int, Gt(gt=0)]]): number of bands in LSH. Default it's None, and\n it will be determined by an optimal params computation\n algorithm by minimize the weighted sum of probs of False\n Positives and False Negatives\n num_rows_per_band (typing.Optional[typing.Annotated[int, Gt(gt=0)]]): number of rows in each band in LSH.\n Default it's None, and it will be determined by an optimal\n params computation algorithm\n tokenizer_model (typing.Optional[str]): path for the sentencepiece model, used for\n sentencepiece tokenization.\n"}, {"index": 6, "class_name": "document_simhash_deduplicator", "class_desc": "Deduplicates samples at the document level using SimHash.\n\n This operator computes SimHash values for each sample and removes duplicates based on a\n specified Hamming distance threshold. It supports different tokenization methods:\n 'space', 'punctuation', and 'character'. The SimHash is computed over shingles of a\n given window size, and the deduplication process clusters similar documents and retains\n only one from each cluster. The default mode converts text to lowercase and can ignore\n specific patterns. The key metric, Hamming distance, is used to determine similarity\n between SimHash values. Important notes:\n - The `ignore_pattern` parameter can be used to exclude certain substrings during\n SimHash computation.\n - For punctuation-based tokenization, the `ignore_pattern` should not include\n punctuations to avoid conflicts.\n - The `hamming_distance` must be less than the number of blocks (`num_blocks`).\n - Only the first sample in each cluster is retained by default.", "arguments": " tokenization (): tokenization method for\n sample texts.\n\n It should be one of [space, punctuation, character]. For\n English-like languages, we recommend to use 'space'. And for\n Chinese-like languages, we recommend to use 'character'\n window_size (typing.Annotated[int, Gt(gt=0)]): window size of shingling\n lowercase (): whether to convert text to lower case first\n ignore_pattern (typing.Optional[str]): whether to ignore sub-strings with\n specific pattern when computing simhash\n num_blocks (typing.Annotated[int, Gt(gt=0)]): number of blocks in simhash computing\n hamming_distance (typing.Annotated[int, Gt(gt=0)]): the max hamming distance threshold in\n near-duplicate detection. When the hamming distance of two\n sample texts is <= this threshold, they are regarded as\n similar samples and this op will only keep one of them after\n deduplication. This threshold should be always less than\n num_blocks\n"}, {"index": 7, "class_name": "general_fused_op", "class_desc": "An explicitly fused operator designed to execute multiple sequential operations (OPs) on\n the same batch, enabling fine-grained control over data processing.\n\n This operator allows for the chaining of multiple data processing steps, such as mappers\n and filters, into a single pass. It processes each batch of samples sequentially through\n the defined operations, ensuring that all specified transformations are applied in\n order. The operator supports both mappers, which transform data, and filters, which\n remove or keep samples based on computed statistics. Context variables can be passed\n between operations if needed. The accelerator is set to 'cuda' if any of the fused\n operations use it. The number of processes is determined by the minimum value among all\n fused operations. After processing, any temporary context variables, such as those used\n for video containers, are cleaned up.", "arguments": " batch_size (): the batch size of the input samples.\n fused_op_list (typing.Optional[typing.List]): a list of OPs to be fused.\n"}, {"index": 8, "class_name": "image_deduplicator", "class_desc": "Deduplicates samples at the document level by exact matching of images.\n\n This operator compares images across documents to identify and remove duplicates.\n - It uses a specified hash method (default is 'phash') to compute image hashes.\n - If `consider_text` is set, it also considers text content for deduplication,\n using a text deduplicator in conjunction with the image hashes.\n - The key metric, `imagehash`, is computed for each sample. If `consider_text`\n is enabled, an additional `hash` field is used.\n - Duplicates are identified by comparing these hash values. Samples with\n identical hashes are considered duplicates.\n - When `show_num` is greater than 0, the operator also returns a subset of\n duplicate pairs for tracing purposes.\n - The operator caches the `imagehash` and, if applicable, the `hash` fields.", "arguments": " method (): hash method for image\n consider_text (): whether to consider text hash together with image\n hash when applying deduplication.\n"}, {"index": 9, "class_name": "ray_bts_minhash_deduplicator", "class_desc": "A MinhashLSH deduplicator that operates in Ray distributed mode.\n\n This operator uses the MinHash LSH technique to identify and remove near-duplicate\n samples from a dataset. It supports various tokenization methods, including space,\n punctuation, character, and sentencepiece. The Jaccard similarity threshold is used to\n determine if two samples are considered duplicates. If the Jaccard similarity of two\n samples is greater than or equal to the specified threshold, one of the samples is\n filtered out. The operator computes the MinHash values for each sample and uses a union-\n find algorithm to group similar samples. The key metric, Jaccard similarity, is computed\n based on the shingling of the text. The operator can run on both CPU and GPU, with\n specific batch size and memory configurations for each.", "arguments": " tokenization (): tokenization method for sample texts. It\n should be one of [space, punctuation, character,\n sentencepiece]. For English-like languages, we recommend\n to use 'space', for Chinese-like languages, we recommend\n to use 'character', and for multiple languages, we recommend\n to use 'sentencepiece'. If using 'sentencepiece', please\n provided the model path in the 'tokenizer_model' field.\n window_size (typing.Annotated[int, Gt(gt=0)]): window size of shingling\n lowercase (): whether to convert text to lower case first\n ignore_pattern (typing.Optional[str]): whether to ignore sub-strings with\n specific pattern when computing minhash\n num_permutations (typing.Annotated[int, Gt(gt=0)]): number of permutations in minhash\n computing\n jaccard_threshold (typing.Annotated[float, FieldInfo(annotation=NoneType, required=True, metadata=[Ge(ge=0), Le(le=1)])]): the min jaccard similarity threshold\n in near-duplicate detection. When the jaccard similarity of\n two sample texts is >= this threshold, they are regarded as\n similar samples and this op will only keep one of them after\n deduplication\n num_bands (typing.Optional[typing.Annotated[int, Gt(gt=0)]]): number of bands in LSH. Default it's None, and\n it will be determined by an optimal params computation\n algorithm by minimize the weighted sum of probs of False\n Positives and False Negatives\n num_rows_per_band (typing.Optional[typing.Annotated[int, Gt(gt=0)]]): number of rows in each band in LSH.\n Default it's None, and it will be determined by an optimal\n params computation algorithm\n tokenizer_model (typing.Optional[str]): path for the sentencepiece model, used for\n sentencepiece tokenization.\n union_find_parallel_num (typing.Union[int, str]): number of parallel workers for\n union-find algorithm. Default it's 'auto', and it will be\n determined by half of the number of CPUs.\n union_threshold (typing.Optional[int]): threshold for minhash values group to\n perform union-find algorithm. Default it's 256.\n max_pending_edge_buffer_task (typing.Optional[int]): max number of pending edge buffer\n ray tasks. Default it's 20.\n num_edge_buffer_task_returns (typing.Optional[int]): number of edge buffer tasks for\n `ray.wait` to return. Default it's 10.\n max_pending_filter_tasks (typing.Optional[int]): max number of pending filter ray\n tasks. Default it's 20.\n num_filter_task_returns (typing.Optional[int]): number of filter tasks for `ray.wait`\n to return. Default it's 10.\n merge_batch_size (typing.Optional[int]): batch size for BTS operations. Default\n it's 1000.\n minhash_batch_size (typing.Union[int, str, NoneType]): batch size for MinHash computation. If \"auto\",\n it will be set to default value on CPU(1024), or auto calculated per\n available GPU memory and memory_per_sample setting for GPU.\n memory_per_sample (typing.Optional[float]): estimated memory needed per sample in MB.\n Used to calculate batch size based on available GPU memory.\n Default is 0.1 MB per sample.\n"}, {"index": 10, "class_name": "ray_document_deduplicator", "class_desc": "Deduplicates samples at the document level using exact matching in Ray distributed mode.\n\n This operator computes a hash for each document and filters out duplicates based on\n exact matches. The hash is calculated from the text content, which can be optionally\n converted to lowercase and stripped of non-alphabet characters. The key metric used for\n deduplication is the MD5 hash of the processed text. If the `lowercase` parameter is\n set, the text is converted to lowercase before hashing. If `ignore_non_character` is\n enabled, all non-alphabet characters, including whitespaces, digits, and punctuation,\n are removed. The operator supports two backends: 'ray_actor' and 'redis', with the\n default being 'ray_actor'.", "arguments": " backend (): the backend for dedup, either 'ray_actor' or 'redis'\n redis_address (): the address of redis server\n lowercase (): Whether to convert sample text to lower case\n ignore_non_character (): Whether to ignore non-alphabet\n characters, including whitespaces, digits, and punctuations\n"}, {"index": 11, "class_name": "ray_image_deduplicator", "class_desc": "Deduplicates samples at the document level using exact matching of images in Ray distributed mode.\n\n This operator uses a specified hash method to compute image hashes and identifies\n duplicates by comparing these hashes. It operates in Ray distributed mode, supporting\n 'ray_actor' or 'redis' backends for deduplication. The hash method can be set during\n initialization, with supported methods listed in `HASH_METHOD`. If a sample does not\n contain an image, it is assigned an empty hash value. The operator loads images from the\n specified keys and computes their combined hash for comparison.", "arguments": " backend (): the backend for dedup, either 'ray_actor' or 'redis'\n redis_address (): the address of redis server\n method (): the hash method to use\n"}, {"index": 12, "class_name": "ray_video_deduplicator", "class_desc": "Deduplicates samples at document-level using exact matching of videos in Ray distributed mode.\n\n This operator computes the MD5 hash of video streams in each sample and compares them to\n identify duplicates. It uses Ray distributed mode for parallel processing. The hash is\n computed by demuxing the video streams and updating the MD5 hash with each video packet.\n If a sample does not contain a valid video, it is assigned an empty hash value. The\n operator supports 'ray_actor' or 'redis' backends for deduplication.", "arguments": " backend (): the backend for dedup, either 'ray_actor' or 'redis'\n redis_address (): the address of redis server\n"}, {"index": 13, "class_name": "video_deduplicator", "class_desc": "Deduplicates samples at the document level using exact matching of videos.\n\n This operator computes a hash for each video in the sample and uses it to identify and\n remove duplicate documents. If `consider_text` is set to True, it also considers the\n text hash alongside the video hash for deduplication. The video hash is computed by\n hashing the video data, including all video streams in the container. The operator\n supports sampling and tracing of duplicate pairs when the `show_num` parameter is\n greater than 0. Important fields used for caching include 'videohash' and optionally\n 'hash' if text is considered.", "arguments": " consider_text (): whether to consider text hash together with video\n hash when applying deduplication.\n"}, {"index": 14, "class_name": "alphanumeric_filter", "class_desc": "Filter to keep samples with an alphabet/numeric ratio within a specific range.\n\n This operator filters samples based on the ratio of alphanumeric characters or tokens.\n It keeps samples where the ratio of alphanumeric characters (or tokens) to the total\n number of characters (or tokens) is within the specified range. The ratio is computed\n either character-based or token-based, depending on the `tokenization` parameter. If\n `tokenization` is True, it uses a Hugging Face tokenizer to count tokens. The key metric\n used for filtering is 'alpha_token_ratio' if tokenization is enabled, otherwise\n 'alnum_ratio'. The operator caches these metrics in the stats field for each sample.", "arguments": " tokenization (): Whether to count the ratio of alphanumeric\n to the total number of tokens. if tokenization=False, it\n will count the ratio of alphanumeric to the total number of\n characters.\n min_ratio (): The min filter ratio in alphanumeric op,\n samples will be filtered if their alphabet/numeric ratio is\n below this parameter.\n max_ratio (): The max filter ratio in alphanumeric op,\n samples will be filtered if their alphabet/numeric ratio\n exceeds this parameter.\n"}, {"index": 15, "class_name": "audio_duration_filter", "class_desc": "Keep data samples whose audio durations are within a specified range.\n\n This operator filters data samples based on the duration of their audio files. It keeps\n samples where the audio duration is between a minimum and maximum value, in seconds. The\n operator supports two strategies for keeping samples: 'any' (keep if any audio meets the\n condition) or 'all' (keep only if all audios meet the condition). The audio duration is\n computed using the `librosa` library. If the audio duration has already been computed,\n it is retrieved from the sample's stats under the key 'audio_duration'. If no audio is\n present in the sample, an empty array is stored in the stats.", "arguments": " min_duration (): The min audio duration to keep samples in seconds.\n It's 0 by default.\n max_duration (): The max audio duration to keep samples in seconds.\n It's sys.maxsize by default.\n any_or_all (): keep this sample with 'any' or 'all' strategy of\n all audios. 'any': keep this sample if any audios meet the\n condition. 'all': keep this sample only if all audios meet the\n condition.\n"}, {"index": 16, "class_name": "audio_nmf_snr_filter", "class_desc": "Keep data samples whose audio Signal-to-Noise Ratios (SNRs) are within a specified\n range.\n\n This operator computes the SNR of each audio in a sample using Non-negative Matrix\n Factorization (NMF). It then filters the samples based on whether their SNRs fall within\n the given minimum and maximum thresholds. The SNR is computed for each audio, and the\n filtering strategy can be set to either 'any' or 'all'. In 'any' mode, a sample is kept\n if at least one of its audios meets the SNR criteria. In 'all' mode, all audios must\n meet the criteria for the sample to be kept. The NMF computation uses a specified number\n of iterations. If no audio is present in the sample, the SNR is recorded as an empty\n array. The key metric is stored in the 'audio_nmf_snr' field.", "arguments": " min_snr (): The min audio SNR to keep samples in dB. It's 0 by\n default.\n max_snr (): The max audio SNR to keep samples in dB. It's\n sys.maxsize by default.\n nmf_iter_num (typing.Annotated[int, Gt(gt=0)]): The max number of iterations to run NMF. It's 500\n in default.\n any_or_all (): keep this sample with 'any' or 'all' strategy of\n all audios. 'any': keep this sample if any audios meet the\n condition. 'all': keep this sample only if all audios meet the\n condition.\n"}, {"index": 17, "class_name": "audio_size_filter", "class_desc": "Keep data samples based on the size of their audio files.\n\n This operator filters data samples by checking if the size of their audio files falls\n within a specified range. The size can be in bytes, kilobytes, megabytes, or any other\n unit. The key metric used is 'audio_sizes', which is an array of file sizes in bytes. If\n no audio files are present, the 'audio_sizes' field will be an empty array. The operator\n supports two strategies for keeping samples: 'any' and 'all'. In 'any' mode, a sample is\n kept if at least one of its audio files meets the size criteria. In 'all' mode, all\n audio files must meet the size criteria for the sample to be kept.", "arguments": " min_size (): The min audio size to keep samples. set to be \"0\" by\n default for no size constraint\n max_size (): The max audio size to keep samples. set to be\n \"1Tb\" by default, an approximate for un-limited case\n any_or_all (): keep this sample with 'any' or 'all' strategy of\n all audios. 'any': keep this sample if any audios meet the\n condition. 'all': keep this sample only if all audios meet the\n condition.\n"}, {"index": 18, "class_name": "average_line_length_filter", "class_desc": "Filter to keep samples with average line length within a specific range.\n\n This operator filters out samples based on their average line length. It keeps samples\n where the average line length is between the specified minimum and maximum values. The\n average line length is calculated as the total text length divided by the number of\n lines. If the context is provided, it uses precomputed lines from the context. The\n computed average line length is stored in the 'avg_line_length' key in the stats field.", "arguments": " min_len (): The min filter length in this op, samples will\n be filtered if their average line length is below this\n parameter.\n max_len (): The max filter length in this op, samples will\n be filtered if their average line length exceeds this\n parameter.\n"}, {"index": 19, "class_name": "character_repetition_filter", "class_desc": "Filter to keep samples with character-level n-gram repetition ratio within a specific\n range.\n\n This operator calculates the character-level n-gram repetition ratio for each sample and\n filters out samples that do not fall within the specified range. The repetition ratio is\n computed based on the frequency of n-grams in the text. The key metric 'char_rep_ratio'\n is cached in the stats field. Samples are kept if their 'char_rep_ratio' is between the\n specified min and max ratios. The n-gram length, minimum, and maximum ratios are\n configurable.", "arguments": " rep_len (typing.Annotated[int, Gt(gt=0)]): Repetition length for char-level n-gram.\n min_ratio (): The min filter ratio in this op, samples will\n be filtered if their char-level n-gram repetition ratio is\n below this parameter.\n max_ratio (): The max filter ratio in this op, samples will\n be filtered if their char-level n-gram repetition ratio\n exceeds this parameter.\n"}, {"index": 20, "class_name": "flagged_words_filter", "class_desc": "Filter to keep samples with flagged-word ratio in a specified range.\n\n This operator filters out samples based on the ratio of flagged words. It uses a list of\n flagged words, which can be language-specific or combined from multiple languages. The\n flagged-word ratio is computed as the number of flagged words divided by the total\n number of words in the sample. If tokenization is enabled, a Hugging Face tokenizer is\n used to split the text into words. The operator supports word augmentation for certain\n languages, which can be configured. The key metric, 'flagged_words_ratio', is cached and\n reused if already computed. Samples are kept if their flagged-word ratio falls within\n the specified min and max ratio.", "arguments": " lang (): Consider flagged words in what language. If lang ==\n \"all\", we will adopt the one merged from all the available\n languages\n tokenization (): Whether to use model to tokenize documents\n min_ratio (): The min filter ratio in this op.\n max_ratio (): The max filter ratio in this op.\n flagged_words_dir (): The directory storing the\n flagged_words file(s) whose name includes \"flagged_words\"\n and in json format\n use_words_aug (): Whether to augment words, especially for\n Chinese and Vietnamese\n words_aug_group_sizes (typing.List[typing.Annotated[int, Gt(gt=0)]]): The group size of words to augment\n words_aug_join_char (): The join char between words to\n augment\n"}, {"index": 21, "class_name": "general_field_filter", "class_desc": "Filter to keep samples based on a general field filter condition.\n\n The filter condition is a string that can include logical operators (and/or) and chain\n comparisons. For example: \"10 < num <= 30 and text != 'nothing here' and __dj__meta__.a\n == 3\". The condition is evaluated for each sample, and only samples that meet the\n condition are kept. The result of the filter condition is stored in the sample's stats\n under the key 'general_field_filter_condition'. If the filter condition is empty or\n already computed, the sample is not re-evaluated.", "arguments": " filter_condition (): The filter condition as a string.\n It can include logical operators (and/or) and chain comparisons.\n For example: \"10 < num <= 30 and text != 'nothing here' and __dj__meta__.a == 3\".\n"}, {"index": 22, "class_name": "image_aesthetics_filter", "class_desc": "Filter to keep samples with aesthetics scores within a specific range.\n\n This operator uses a Hugging Face model to predict the aesthetics score of images. It\n keeps samples where the predicted scores fall within the specified min and max score\n range. The operator supports two strategies: 'any' (keep if any image meets the\n condition) and 'all' (keep only if all images meet the condition). Aesthetics scores are\n cached in the 'image_aesthetics_scores' field. If no images are present, the sample is\n kept. Scores are normalized by dividing by 10 if the model name includes\n 'shunk031/aesthetics-predictor'.", "arguments": " hf_scorer_model (): Huggingface model name for the aesthetics\n predictor. By default, we will use\n 'shunk031/aesthetics-predictor-v2-sac-logos-ava1-l14-linearMSE',\n refer to pypi.org/project/simple-aesthetics-predictor\n trust_remote_code ()\n min_score (): Min score for the predicted aesthetics in an image.\n max_score (): Max score for the predicted aesthetics in an image.\n any_or_all (): Keep this sample with 'any' or 'all' strategy of\n all images. 'any': keep this sample if any images meet the\n condition. 'all': keep this sample only if all images meet the\n condition.\n"}, {"index": 23, "class_name": "image_aspect_ratio_filter", "class_desc": "Filter to keep samples with image aspect ratio within a specific range.\n\n The operator computes the aspect ratio for each image in the sample, defined as the\n width divided by the height (W / H). It caches the computed aspect ratios in the\n 'aspect_ratios' field. Samples are kept if their images' aspect ratios fall within the\n specified minimum and maximum range. The 'any_or_all' parameter determines the strategy:\n 'any' keeps samples if at least one image meets the criteria, while 'all' requires all\n images to meet the criteria. If no images are present in a sample, the sample is not\n filtered out.", "arguments": " min_ratio (): The min aspect ratio to keep samples.\n max_ratio (): The max aspect ratio to keep samples.\n any_or_all (): keep this sample with 'any' or 'all' strategy of\n all images. 'any': keep this sample if any images meet the\n condition. 'all': keep this sample only if all images meet the\n condition.\n"}, {"index": 24, "class_name": "image_face_count_filter", "class_desc": "Filter to keep samples with the number of faces within a specific range.\n\n This operator uses an OpenCV classifier for face detection. It filters samples based on\n the number of faces detected in the images, keeping only those with a face count within\n the specified range. The operator supports two strategies: 'any' (keep if any image\n meets the condition) and 'all' (keep only if all images meet the condition). The face\n counts are cached in the 'face_counts' field. If no images are present in the sample,\n the face count is set to an empty array.", "arguments": " cv_classifier (): OpenCV classifier path for face detection.\n By default, we will use 'haarcascade_frontalface_alt.xml'.\n min_face_count (): Minimum number of faces required for samples.\n max_face_count (): Maximum number of faces required for samples.\n any_or_all (): Keep this sample with 'any' or 'all' strategy of\n all images. 'any': keep this sample if any images meet the\n condition. 'all': keep this sample only if all images meet the\n condition.\n"}, {"index": 25, "class_name": "image_face_ratio_filter", "class_desc": "Filter to keep samples with face area ratios within a specific range.\n\n This operator filters samples based on the ratio of the largest face area to the total\n image area. It uses an OpenCV classifier for face detection. The key metric,\n 'face_ratios', is computed for each image in the sample. Samples are kept if the face\n area ratios fall within the specified min and max ratio range. The filtering strategy\n can be set to 'any' (keep if any image meets the condition) or 'all' (keep only if all\n images meet the condition). If no images are present in the sample, the sample is\n retained.", "arguments": " cv_classifier (): OpenCV classifier path for face detection.\n By default, we will use 'haarcascade_frontalface_alt.xml'.\n min_ratio (): Min ratio for the largest face area in an image.\n max_ratio (): Max ratio for the largest face area in an image.\n any_or_all (): Keep this sample with 'any' or 'all' strategy of\n all images. 'any': keep this sample if any images meet the\n condition. 'all': keep this sample only if all images meet the\n condition.\n"}, {"index": 26, "class_name": "image_nsfw_filter", "class_desc": "Filter to keep samples whose images have nsfw scores in a specified range.\n\n This operator uses a Hugging Face model to compute the nsfw scores for each image in a\n sample. It keeps samples based on the specified `min_score` and `max_score` thresholds.\n The operator supports two strategies: 'any' (keep the sample if any image meets the\n condition) or 'all' (keep the sample only if all images meet the condition). The nsfw\n scores are cached in the 'image_nsfw_score' field of the sample's stats.", "arguments": " hf_nsfw_model (): nsfw detection model name on huggingface.\n trust_remote_code ()\n min_score (): the min nsfw score threshold for samples.\n range from 0 to 1.\n max_score (): the max nsfw score threshold for samples.\n range from 0 to 1.\n any_or_all (): keep this sample with 'any' or 'all' strategy of\n all images. 'any': keep this sample if any images meet the\n condition. 'all': keep this sample only if all images meet the\n condition.\n"}, {"index": 27, "class_name": "image_pair_similarity_filter", "class_desc": "Filter to keep image pairs with similarities between images within a specific range.\n\n This operator uses a Hugging Face CLIP model to compute the cosine similarity between\n two images in each sample. It retains samples where the similarity score falls within\n the specified minimum and maximum thresholds. The 'any' strategy keeps a sample if any\n of the image pairs meet the condition, while the 'all' strategy requires all image pairs\n to meet the condition. The similarity scores are cached in the 'image_pair_similarity'\n field. Each sample must include exactly two distinct images.", "arguments": " hf_clip (): clip model name on huggingface to compute\n the similarity between image and text.\n trust_remote_code ()\n min_score (): The min similarity to keep samples.\n max_score (): The max similarity to keep samples.\n any_or_all (): keep this sample with 'any' or 'all' strategy of\n all images. 'any': keep this sample if any images meet the\n condition. 'all': keep this sample only if all images meet the\n condition.\n"}, {"index": 28, "class_name": "image_shape_filter", "class_desc": "Filter to keep samples with image shape (width, height) within specific ranges.\n\n This operator filters samples based on the width and height of images. It keeps samples\n where the image dimensions fall within the specified ranges. The operator supports two\n strategies: 'any' and 'all'. In 'any' mode, a sample is kept if at least one image meets\n the criteria. In 'all' mode, all images in the sample must meet the criteria for the\n sample to be kept. The image width and height are stored in the 'image_width' and\n 'image_height' fields of the sample's stats. If no images are present in the sample, the\n corresponding stats fields will be empty arrays.", "arguments": " min_width (): The min width to keep samples.\n max_width (): The max width to keep samples.\n min_height (): The min height to keep samples.\n max_height (): The max height to keep samples.\n any_or_all (): keep this sample with 'any' or 'all' strategy of\n all images. 'any': keep this sample if any images meet the\n condition. 'all': keep this sample only if all images meet the\n condition.\n"}, {"index": 29, "class_name": "image_size_filter", "class_desc": "Keep data samples whose image size (in Bytes/KB/MB/...) is within a specific range.\n\n This operator filters data samples based on the size of their images. It keeps samples\n if the image sizes fall within the specified minimum and maximum size range. The operator\n supports two strategies: 'any'(keep the sample if any image meets the size condition) and\n 'all' (keep the sample only if all images meet the size condition). If no images are\n present in the sample, the 'image_sizes' field will be an empty array.", "arguments": " min_size (): The min image size to keep samples. set to be \"0\" by\n default for no size constraint\n max_size (): The max image size to keep samples. set to be\n \"1TB\" by default, an approximate for un-limited case\n any_or_all (): keep this sample with 'any' or 'all' strategy of\n all images. 'any': keep this sample if any images meet the\n condition. 'all': keep this sample only if all images meet the\n condition.\n"}, {"index": 30, "class_name": "image_text_matching_filter", "class_desc": "Filter to keep samples with image-text matching scores within a specific range.\n\n This operator uses a Hugging Face BLIP model to compute the matching score between\n images and text. It keeps samples where the matching score falls within the specified\n `min_score` and `max_score` range. The key metric, `image_text_matching_score`, is\n computed for each image-text pair. If multiple images are associated with a single text,\n the scores can be reduced using 'avg', 'max', or 'min' modes. The operator supports\n horizontal and vertical flipping of images. Samples are kept based on either 'any' or\n 'all' strategy: 'any' keeps the sample if any image meets the condition, while 'all'\n keeps the sample only if all images meet the condition.", "arguments": " hf_blip (): blip model name on huggingface to compute\n the matching score between image and text.\n trust_remote_code ()\n min_score (): The min matching score to keep samples.\n max_score (): The max matching score to keep samples.\n horizontal_flip (): Flip image horizontally (left to right).\n vertical_flip (): Flip image vertically (top to bottom).\n any_or_all (): keep this sample with 'any' or 'all' strategy of\n all images. 'any': keep this sample if any images meet the\n condition. 'all': keep this sample only if all images meet the\n condition.\n reduce_mode (): reduce mode when one text corresponds to\n multiple images in a chunk.\n 'avg': Take the average of multiple values\n 'max': Take the max of multiple values\n 'min': Take the min of multiple values\n"}, {"index": 31, "class_name": "image_text_similarity_filter", "class_desc": "Filter to keep samples with image-text similarity within a specified range.\n\n This operator uses a Hugging Face CLIP model to compute the similarity between images\n and text. It retains samples where the similarity scores fall within the given range.\n The similarity score is computed for each image-text pair, and the final score can be\n reduced using 'avg', 'max', or 'min' modes. The 'any' or 'all' strategy determines if at\n least one or all image-text pairs must meet the similarity criteria. The key metric\n 'image_text_similarity' is cached in the sample's stats. Images can be flipped\n horizontally or vertically before computing the similarity.", "arguments": " hf_clip (): clip model name on huggingface to compute\n the similarity between image and text.\n trust_remote_code ()\n min_score (): The min similarity to keep samples.\n max_score (): The max similarity to keep samples.\n horizontal_flip (): Flip image horizontally (left to right).\n vertical_flip (): Flip image vertically (top to bottom).\n any_or_all (): keep this sample with 'any' or 'all' strategy of\n all images. 'any': keep this sample if any images meet the\n condition. 'all': keep this sample only if all images meet the\n condition.\n reduce_mode (): reduce mode when one text corresponds to\n multiple images in a chunk.\n 'avg': Take the average of multiple values\n 'max': Take the max of multiple values\n 'min': Take the min of multiple values\n"}, {"index": 32, "class_name": "image_watermark_filter", "class_desc": "Filter to keep samples whose images have no watermark with high probability.\n\n This operator uses a Hugging Face watermark detection model to filter samples based on\n the presence of watermarks in their images. It keeps samples where the predicted\n watermark probability is below a specified threshold. The operator supports two\n strategies: 'any' (keep if any image meets the condition) and 'all' (keep only if all\n images meet the condition). The key metric 'image_watermark_prob' is computed for each\n image, representing the probability that the image contains a watermark. If no images\n are present in the sample, the metric is set to an empty array.", "arguments": " hf_watermark_model (): watermark detection model name on\n huggingface.\n trust_remote_code ()\n prob_threshold (): the predicted watermark probability threshold\n for samples. range from 0 to 1. Samples with watermark probability\n less than this threshold will be kept.\n any_or_all (): keep this sample with 'any' or 'all' strategy of\n all images. 'any': keep this sample if any images meet the\n condition. 'all': keep this sample only if all images meet the\n condition.\n"}, {"index": 33, "class_name": "llm_perplexity_filter", "class_desc": "Filter to keep samples with perplexity scores within a specified range, computed using a\n specified LLM.\n\n This operator computes the perplexity score for each sample using a Hugging Face LLM. It\n then filters the samples based on whether their perplexity scores fall within the\n specified minimum and maximum score range. The perplexity score is calculated as the\n exponential of the loss value from the LLM. The operator uses a query and response\n template to format the input text for the LLM. If the perplexity score is not already\n cached in the sample's stats under the key 'llm_perplexity', it will be computed.", "arguments": " hf_model (): huggingface embedding model name.\n model_params (typing.Optional[typing.Dict]): Parameters for initializing the API model.\n min_score (): Minimum perplexity score.\n max_score (): Maximum perplexity score.\n query_template (typing.Optional[str]): Template for building the query string.\n response_template (typing.Optional[str]): Template for building the response string.\n"}, {"index": 34, "class_name": "in_context_influence_filter", "class_desc": "Filter to keep texts based on their in-context influence on a validation set.\n\n This operator calculates the in-context influence of each sample by comparing\n perplexities with and without the sample as context. The influence score is computed as\n the ratio of these perplexities. If `valid_as_demo` is True, the score is L(A|Q) /\n L(A|task_desc, Q_v, A_v, Q). Otherwise, it is L(A_v|Q) / L(A_v|task_desc, Q, A, Q_v).\n The operator retains samples whose in-context influence score is within a specified\n range. The in-context influence score is stored in the 'in_context_influence' field of\n the sample's stats. The validation set must be prepared using the\n `prepare_valid_feature` method if not provided during initialization.", "arguments": " valid_dataset (typing.Optional[typing.List[typing.Dict]]): The dataset to use for validation.\n If None, 'self.prepare_valid_feature' should be manually called before applying the filter.\n task_desc (): The description of the validation task.\n valid_as_demo (): If true, score = L(A|Q) / L(A|task_desc, Q_v, A_v, Q);\n If false, score = L(A_v|Q) L(A_v|task_desc, Q, A, Q_v).\n n_shot (typing.Optional[int]): The number of shots in validation.\n"}, {"index": 35, "class_name": "instruction_following_difficulty_filter", "class_desc": "Filter to keep texts based on their instruction following difficulty (IFD,\n https://arxiv.org/abs/2308.12032) score.\n\n This operator computes the IFD score for each sample, which is the ratio of the loss\n with and without the query. It keeps samples where the IFD score falls within a\n specified range. The IFD score is calculated using a Hugging Face tokenizer and model.\n If the IFD score is already cached in the 'ifd_score' field, it will be reused. The\n operator decides to keep or filter samples based on the provided minimum and maximum IFD\n score thresholds.", "arguments": " hf_model (): huggingface embedding model name.\n model_params (typing.Optional[typing.Dict]): Parameters for initializing the API model.\n min_score (): Minimum perplexity score.\n max_score (): Maximum perplexity score.\n query_template (typing.Optional[str]): Template for building the query string.\n response_template (typing.Optional[str]): Template for building the response string.\n"}, {"index": 36, "class_name": "language_id_score_filter", "class_desc": "Filter to keep samples in a specific language with a confidence score above a threshold.\n\n This operator uses a FastText model to identify the language of each sample. It keeps\n samples that are in the specified language(s) and have a language identification\n confidence score greater than or equal to the minimum score. If no specific language is\n provided, it only filters based on the confidence score. The language ID and its\n confidence score are stored in the 'lang' and 'lang_score' fields of the sample's stats,\n respectively.", "arguments": " lang (typing.Union[str, typing.List[str]]): Samples in which languages to keep.\n min_score (): The min language identification confidence\n scores of samples to keep.\n"}, {"index": 37, "class_name": "llm_analysis_filter", "class_desc": "Base filter class for leveraging LLMs to analyze and filter data samples.\n\n This operator uses an LLM to score and tag each sample across multiple quality\n dimensions. It supports both API-based and Hugging Face models. The LLM evaluates the\n sample on clarity, relevance, usefulness, and fluency, providing scores from 1 to 5.\n Tags are assigned to categorize the sample, and a recommendation is made to keep,\n review, or discard the sample. The average score is computed based on the required\n dimension keys. Samples are kept if their average score falls within the specified min\n and max score thresholds. The key metric 'llm_analysis_score' is cached in the sample's\n stats.", "arguments": " api_or_hf_model (): API or huggingface model name.\n min_score (): The min score threshold to keep the sample.\n max_score (): The max score threshold to keep the sample.\n is_hf_model (): If true, use Transformers for loading hugging face or\n local llm.\n api_endpoint (typing.Optional[str]): URL endpoint for the API.\n response_path (typing.Optional[str]): Path to extract content from the API response.\n Defaults to 'choices.0.message.content'.\n input_keys (typing.List[str]): Sub set of keys in the sample. Support data with\n multi fields such as 'query', 'analysis' and 'answer' in RFT data.\n field_names (typing.List[str]): Corresponding field names for input keys.\n system_prompt (typing.Optional[str]): System prompt for the task.\n input_template (typing.Optional[str]): Template for building the model input.\n field_template (typing.Optional[str]): Template for each field in the prompt.\n try_num (typing.Annotated[int, Gt(gt=0)]): The number of retry attempts when there is an API\n call error or output parsing error.\n enable_vllm (): If true, use VLLM for loading hugging face or\n local llm.\n model_params (typing.Dict): Parameters for initializing the API model.\n sampling_params (typing.Dict): Extra parameters passed to the API call.\n e.g {'temperature': 0.9, 'top_p': 0.95}\n dim_required_keys (typing.Optional[typing.List[str]]): A list of keys used to calculate the average\n dimension score, only the dimension scores associated with these\n keys are used in the average calculation.\n"}, {"index": 38, "class_name": "llm_difficulty_score_filter", "class_desc": "Filter to keep samples with high difficulty scores estimated by an LLM.\n\n This operator uses a Hugging Face LLM to evaluate the difficulty of each sample. The LLM\n analyzes the sample across multiple dimensions, including linguistic complexity,\n conceptual depth, prior knowledge, step complexity, and ambiguity. Each dimension is\n scored on a 1-5 scale, with 5 being the highest difficulty. The final difficulty score\n is computed as the average of these dimension scores. Samples are kept if their\n difficulty score falls within the specified range (min_score to max_score). The key\n metric 'llm_difficulty_score' is stored in the sample's stats, along with detailed\n records and flags.", "arguments": " api_or_hf_model (): API or huggingface model name.\n min_score (): The min score threshold to keep the sample.\n max_score (): The max score threshold to keep the sample.\n is_hf_model (): If true, use Transformers for loading hugging face or\n local llm.\n api_endpoint (typing.Optional[str]): URL endpoint for the API.\n response_path (typing.Optional[str]): Path to extract content from the API response.\n Defaults to 'choices.0.message.content'.\n input_keys (typing.List[str]): Sub set of keys in the sample. Support data with\n multi fields such as 'query', 'analysis' and 'answer' in RFT data.\n field_names (typing.List[str]): Corresponding field names for input keys.\n system_prompt (typing.Optional[str]): System prompt for the task.\n input_template (typing.Optional[str]): Template for building the model input.\n field_template (typing.Optional[str]): Template for each field in the prompt.\n try_num (typing.Annotated[int, Gt(gt=0)]): The number of retry attempts when there is an API\n call error or output parsing error.\n enable_vllm (): If true, use VLLM for loading hugging face or\n local llm.\n model_params (typing.Dict): Parameters for initializing the API model.\n sampling_params (typing.Dict): Extra parameters passed to the API call.\n e.g {'temperature': 0.9, 'top_p': 0.95}\n dim_required_keys (typing.Optional[typing.List[str]]): A list of keys used to calculate the average\n dimension score, only the dimension scores associated with these\n keys are used in the average calculation.\n"}, {"index": 39, "class_name": "llm_quality_score_filter", "class_desc": "Filter to keep samples with a high quality score estimated by a language model.\n\n This operator uses a language model to evaluate the quality of each sample across\n multiple dimensions, including accuracy, grammar, informativeness, and coherence. The\n LLM provides a numerical score for each dimension on a 1-5 scale, where 1 is the lowest\n and 5 is the highest. The overall quality score is used to decide whether to keep or\n filter out the sample based on the specified minimum and maximum score thresholds. The\n evaluation results are cached in the 'llm_quality_score' and 'llm_quality_record'\n fields. Important flags and tags from the LLM's analysis may also be stored in the\n sample's stats.", "arguments": " api_or_hf_model (): API or huggingface model name.\n min_score (): The min score threshold to keep the sample.\n max_score (): The max score threshold to keep the sample.\n is_hf_model (): If true, use Transformers for loading hugging face or\n local llm.\n api_endpoint (typing.Optional[str]): URL endpoint for the API.\n response_path (typing.Optional[str]): Path to extract content from the API response.\n Defaults to 'choices.0.message.content'.\n input_keys (typing.List[str]): Sub set of keys in the sample. Support data with\n multi fields such as 'query', 'analysis' and 'answer' in RFT data.\n field_names (typing.List[str]): Corresponding field names for input keys.\n system_prompt (typing.Optional[str]): System prompt for the task.\n input_template (typing.Optional[str]): Template for building the model input.\n field_template (typing.Optional[str]): Template for each field in the prompt.\n try_num (typing.Annotated[int, Gt(gt=0)]): The number of retry attempts when there is an API\n call error or output parsing error.\n enable_vllm (): If true, use VLLM for loading hugging face or\n local llm.\n model_params (typing.Dict): Parameters for initializing the API model.\n sampling_params (typing.Dict): Extra parameters passed to the API call.\n e.g {'temperature': 0.9, 'top_p': 0.95}\n dim_required_keys (typing.Optional[typing.List[str]]): A list of keys used to calculate the average\n dimension score, only the dimension scores associated with these\n keys are used in the average calculation.\n"}, {"index": 40, "class_name": "llm_task_relevance_filter", "class_desc": "Filter to keep samples with high relevance scores to validation tasks estimated by an\n LLM.\n\n This operator evaluates the relevance of each sample to a specified validation task\n using an LLM. The LLM scores the sample on multiple dimensions, including topical\n relevance, linguistic style match, task match, knowledge alignment, and potential\n utility. Each dimension is scored on a 1-5 scale, with 5 being the highest. The key\n metric, 'llm_task_relevance', is the average score across these dimensions. Samples are\n kept if their average score meets or exceeds the specified minimum threshold. The\n operator uses either an API or a Hugging Face model for evaluation. If no validation\n dataset or task description is provided, the 'prepare_valid_feature' method must be\n called manually before applying the filter.", "arguments": " api_or_hf_model (): API or huggingface model name.\n min_score (): The lowest score threshold to keep the sample.\n is_hf_model (): Indicates if the model is from HuggingFace.\n valid_dataset (typing.Optional[typing.List[typing.Dict]]): The dataset to use for validation.\n task_desc (typing.Optional[str]): The description of the validation task.\n If valid_dataset=None and task_desc=None,\n 'self.prepare_valid_feature' should be manually called before applying the filter.\n n_shot (typing.Optional[int]): The number of shots in validation.\n"}, {"index": 41, "class_name": "maximum_line_length_filter", "class_desc": "Filter to keep samples with a maximum line length within a specified range.\n\n This operator filters out samples based on the length of their longest line. It retains\n samples where the maximum line length is within the specified `min_len` and `max_len`\n range. The maximum line length is computed by splitting the text into lines and\n measuring the length of each line. If the context is provided, it uses precomputed lines\n stored under the key 'lines' in the context. The maximum line length is cached in the\n 'max_line_length' field of the stats.", "arguments": " min_len (): The min filter length in this op, samples will\n be filtered if their maximum line length is below this\n parameter.\n max_len (): The max filter length in this op, samples will\n be filtered if their maximum line length exceeds this\n parameter.\n"}, {"index": 42, "class_name": "perplexity_filter", "class_desc": "Filter to keep samples with perplexity score in a specified range.\n\n This operator computes the perplexity of text samples using a Hugging Face tokenizer and\n a KenLM language model. It keeps samples with perplexity scores within the specified\n minimum and maximum values. The perplexity is calculated character-based by default. If\n the perplexity is already computed, it will be reused from the 'perplexity' field in the\n sample's stats. The operator supports batched operations for efficiency.", "arguments": " lang (): Compute perplexity for samples in which language.\n min_ppl (): The min filter perplexity in this op.\n max_ppl (): The max filter perplexity in this op.\n"}, {"index": 43, "class_name": "phrase_grounding_recall_filter", "class_desc": "Filter to keep samples based on the phrase grounding recall of phrases extracted from\n text in images.\n\n This operator uses a Hugging Face Owl-ViT model to locate phrases extracted from the\n text within the images. It keeps samples where the phrase grounding recall is within a\n specified range. The recall is computed by comparing the number of correctly located\n phrases to the total number of phrases. The operator can handle multiple images per text\n chunk and supports different strategies for reducing the recall values (e.g., average,\n max, min). It also allows for flipping images horizontally or vertically. The key metric\n 'phrase_grounding_recall' is computed and stored in the sample's stats. If no images are\n present, the recall is set to an empty array.", "arguments": " hf_owlvit (): Owl-ViT model name on huggingface to locate the\n phrases extracted from the text.\n trust_remote_code ()\n min_recall (): The min phrase grounding recall to keep samples.\n max_recall (): The max phrase grounding recall to keep samples.\n horizontal_flip (): Flip image horizontally (left to right).\n vertical_flip (): Flip image vertically (top to bottom).\n any_or_all (): keep this sample with 'any' or 'all' strategy of\n all images. 'any': keep this sample if any images meet the\n condition. 'all': keep this sample only if all images meet the\n condition.\n reduce_mode (): reduce mode when one text corresponds to\n multiple images in a chunk.\n 'avg': Take the average of multiple values\n 'max': Take the max of multiple values\n 'min': Take the min of multiple values\n iou_thr (): the IoU threshold for NMS-like post-process. If two\n predicted bboxes are overlap with an IoU larger than this\n threshold, the bbox with less confidence will be removed. Default:\n 0.5.\n large_area_ratio_thr (): the area ratio threshold for filtering out\n those large predicted bboxes. If the area of a predicted bbox\n accounts for more than this ratio threshold of the whole image\n area, this bbox will be removed. Default: 0.95.\n conf_thr (): the confidence score threshold for removing\n low-confidence bboxes. If the confidence score of a predicted bbox\n is lower than the threshold, this bbox will be removed. Default: 0.\n"}, {"index": 44, "class_name": "special_characters_filter", "class_desc": "Filter to keep samples with special-character ratio within a specific range.\n\n This operator filters out samples based on the ratio of special characters in the text.\n It keeps samples where the special-character ratio is within the specified minimum and\n maximum thresholds. The special-character ratio is computed as the number of special\n characters divided by the total number of characters in the text. If the\n 'special_char_ratio' is already cached in the stats, it will be reused. Otherwise, it\n will be computed and stored in the 'special_char_ratio' field.", "arguments": " min_ratio (): The min filter ratio in this op, samples will\n be filtered if their special-char ratio is below this\n parameter.\n max_ratio (): The max filter ratio in this op, samples will\n be filtered if their special-char ratio exceeds this\n parameter.\n"}, {"index": 45, "class_name": "specified_field_filter", "class_desc": "Filter samples based on the specified field information.\n\n This operator checks if the value of a specified field in each sample is within a given\n target value range. If the field value is not within the target range, the sample is\n filtered out. The field can be a multi-level key, with levels separated by dots. The\n target value is a list of acceptable values for the field. If the field value is not a\n list or tuple, it is converted to a list for comparison. Samples are retained if all\n values in the field match any of the target values.\n\n - Uses the 'field_key' and 'target_value' parameters.\n - Supports multi-level field keys, e.g., 'level1.level2'.\n - Converts non-list/tuple field values to a list for comparison.", "arguments": " field_key (): Filter based on the specified value\n corresponding to the target key. The target key\n corresponding to multi-level field information need to be\n separated by '.'.\n target_value (typing.List): The range of specified field information\n corresponding to the samples that need to be retained.\n"}, {"index": 46, "class_name": "specified_numeric_field_filter", "class_desc": "Filter samples based on a specified numeric field value.\n\n This operator filters out samples if the numeric value in the specified field is not\n within the given range. The field can be multi-level, with keys separated by dots. The\n sample is kept if the numeric value is between the minimum and maximum values,\n inclusive. If the field key is not provided, all samples are retained. The operator\n ensures that the field exists in the sample and that its value is numeric before\n performing the comparison.\n\n - Uses the 'min_value' and 'max_value' to define the acceptable range.\n - Supports multi-level fields using dot-separated keys.\n - Returns False for non-numeric or out-of-range values, filtering the sample.", "arguments": " field_key (): Filter based on the specified numeric value\n corresponding to the target key. The target key\n corresponding to multi-level field information need to be\n separated by '.'.\n min_value (): The min filter value in SpecifiedNumericField\n op, samples will be filtered if their specified numeric\n field value is below this parameter.\n max_value (): The max filter value in SpecifiedNumericField\n op, samples will be filtered if their specified numeric\n field value exceeds this parameter.\n"}, {"index": 47, "class_name": "stopwords_filter", "class_desc": "Filter to keep samples with stopword ratio within a specified range.\n\n This operator calculates the ratio of stopwords in a sample and keeps samples where this\n ratio is between the specified minimum and maximum values. The stopword ratio is\n computed as the number of stopwords divided by the total number of words. If the\n `tokenization` parameter is set, a Hugging Face tokenizer is used to tokenize the text.\n The stopwords are loaded from a directory, and if the language is set to \"all\", it\n merges stopwords from all available languages. The key metric is `stopwords_ratio`,\n which is character-based by default. The operator also supports word augmentation for\n specific languages.", "arguments": " lang (): Consider stopwords in what language. If lang ==\n \"all\", we will adopt the one merged from all the available\n languages\n tokenization (): whether to use model to tokenize documents\n min_ratio (): The min filter ratio in this op.\n max_ratio (): The max filter ratio in this op.\n stopwords_dir (): The directory storing the stopwords\n file(s) whose name includes \"stopwords\" and in json format\n use_words_aug (): Whether to augment words, especially for\n Chinese and Vietnamese\n words_aug_group_sizes (typing.List[typing.Annotated[int, Gt(gt=0)]]): The group size of words to augment\n words_aug_join_char (): The join char between words to\n augment\n"}, {"index": 48, "class_name": "suffix_filter", "class_desc": "Filter to keep samples with specified suffix.\n\n This operator retains samples that have a suffix matching any of the provided suffixes.\n If no suffixes are specified, all samples are kept. The key metric 'keep' is computed\n based on whether the sample's suffix matches the specified list. The 'suffix' field of\n each sample is checked against the list of allowed suffixes. If the suffix matches, the\n sample is kept; otherwise, it is filtered out.", "arguments": " suffixes (typing.Union[str, typing.List[str]]): the suffix of text that will be keep.\n For example: '.txt', 'txt' or ['txt', '.pdf', 'docx']\n"}, {"index": 49, "class_name": "text_action_filter", "class_desc": "Filter to keep texts that contain a minimum number of actions.\n\n This operator uses a Spacy model to detect actions in the text. It keeps samples if the\n number of detected actions meets or exceeds the specified minimum. The supported\n languages are English ('en') and Chinese ('zh'). The 'num_action' statistic is computed\n and cached for each sample. Actions are identified based on part-of-speech (POS) tags\n and specific tags for verbs.", "arguments": " lang (): language of the text in the samples. 'en' for detection of\n actions in English and 'zh' for detection of actions in Chinese.\n min_action_num (): The min action number in the filtering. samples\n will be filtered if their action number in the text is below this\n parameter.\n"}, {"index": 50, "class_name": "text_embd_similarity_filter", "class_desc": "Filter to keep texts whose average embedding similarity to a set of given validation\n texts falls within a specific range.\n\n This operator computes the cosine similarity between the text embeddings and a set of\n validation text embeddings. It keeps samples where the average similarity score is\n within the specified range. The key metric, 'text_embd_similarity', is computed as the\n mean cosine similarity. The operator supports both API-based and Hugging Face model-\n based embeddings. If no valid dataset is provided, the `prepare_valid_feature` method\n must be called manually before applying the filter.", "arguments": " api_or_hf_model (): API or huggingface embedding model name.\n is_hf_model (): Indicates if the model is from HuggingFace.\n api_endpoint (): Embedding URL endpoint for the API.\n response_path (): Path to extract content from the API response.\n Defaults to 'data.0.embedding' for embedding model.\n model_params (typing.Optional[typing.Dict]): Parameters for initializing the API model.\n min_score (): The min average similarity to keep samples.\n max_score (): The max average similarity to keep samples.\n valid_dataset (typing.Optional[typing.List[typing.Dict]]): The dataset to use for validation.\n If None, 'self.prepare_valid_feature' should be manually called before applying the filter.\n ebd_dim (): The embedding's dimension via API.\n API specific parameter, i.e., if is_hf_model=True, this parameter will not take effect.\n pooling (typing.Optional[str]): strategy to extract embedding from the hidden states. https://arxiv.org/abs/2503.01807\n None: default option, the hidden state of the last token.\n \"mean\": uniform mean of hidden states.\n \"weighted_mean\": weighted mean of hidden states. https://arxiv.org/abs/2202.08904\n HF_MODEL specific parameter, i.e., if is_hf_model=False, this parameter will not take effect.\n input_template (typing.Optional[str]): Template for building the model input.\n"}, {"index": 51, "class_name": "text_entity_dependency_filter", "class_desc": "Identify and filter text samples based on entity dependencies.\n\n This operator uses a spaCy model to detect entities in the text and evaluates their\n dependency relationships. It filters out samples where entities have fewer than a\n specified number of dependency edges. The key metric is 'num_dependency_edges', which\n counts the number of edges for each entity in the dependency tree. Samples with no\n detected entities are omitted. The operator supports 'any' or 'all' strategies: 'any'\n keeps samples if at least one entity meets the dependency threshold, while 'all'\n requires all entities to meet the threshold. Supported languages are English ('en') and\n Chinese ('zh').", "arguments": " lang (): language of the text in the samples. 'en' for detection of\n entities in English and 'zh' for detection of entities in Chinese.\n min_dependency_num (): The min token number in the filtering.\n Objects is independent if their number of edges in the dependency\n tree is below this parameter.\n any_or_all (): keep this sample with 'any' or 'all' strategy.\n 'any': keep this sample if any object is dependent. 'all': keep\n this sample only if all images are dependent.\n"}, {"index": 52, "class_name": "text_length_filter", "class_desc": "Filter to keep samples with total text length within a specific range.\n\n This operator filters out samples based on their total text length. It retains samples\n where the text length is between the specified minimum and maximum lengths. The text\n length is computed as the number of characters in the sample's text. If the 'text_len'\n key is already present in the sample's stats, it will be reused; otherwise, it will be\n computed. The operator processes samples in batches for efficiency.", "arguments": " min_len (): The min text length in the filtering. samples\n will be filtered if their text length is below this\n parameter.\n max_len (): The max text length in the filtering. samples\n will be filtered if their text length exceeds this\n parameter.\n"}, {"index": 53, "class_name": "text_pair_similarity_filter", "class_desc": "Filter to keep text pairs with similarities within a specific range.\n\n This operator computes the similarity between two texts in a pair using a Hugging Face\n CLIP model. It keeps samples where the similarity score falls within the specified min\n and max thresholds. The key metric, 'text_pair_similarity', is computed as the cosine\n similarity between the text embeddings. The operator supports two strategies for keeping\n samples: 'any' (keep if any pair meets the condition) and 'all' (keep only if all pairs\n meet the condition). If the second text key is not provided, the operator will raise an\n error. The similarity scores are cached under the 'text_pair_similarity' field in the\n sample's stats.", "arguments": " hf_clip (): clip model name on huggingface to compute\n the similarity between image and text.\n trust_remote_code ()\n min_score (): The min similarity to keep samples.\n max_score (): The max similarity to keep samples.\n text_key_second (): used to store the other sentence\n in the text pair.\n any_or_all (): keep this sample with 'any' or 'all' strategy of\n all images. 'any': keep this sample if any images meet the\n condition. 'all': keep this sample only if all images meet the\n condition.\n"}, {"index": 54, "class_name": "token_num_filter", "class_desc": "Filter to keep samples with a total token number within a specified range.\n\n This operator uses a Hugging Face tokenizer to count the number of tokens in each\n sample. It keeps samples where the token count is between the minimum and maximum\n thresholds. The token count is stored in the 'num_token' field of the sample's stats. If\n the token count is not already computed, it will be calculated using the specified\n tokenizer.", "arguments": " hf_tokenizer (): the tokenizer name of Hugging Face tokenizers.\n min_num (): The min filter token number in this op, samples\n will be filtered if their token number is below this\n parameter.\n max_num (): The max filter token number in this op, samples\n will be filtered if their token number exceeds this\n parameter.\n"}, {"index": 55, "class_name": "video_aesthetics_filter", "class_desc": "Filter to keep data samples with aesthetics scores for specified frames in the videos\n within a specific range.\n\n This operator evaluates the aesthetic quality of video frames using a Hugging Face\n model. It keeps samples where the aesthetics scores of the specified frames fall within\n a given range. The key metric, 'video_frames_aesthetics_score', is computed by\n averaging, taking the max, or min of the frame scores, depending on the reduce mode.\n Frame sampling can be done uniformly or by extracting all keyframes. The filter applies\n a 'any' or 'all' strategy to decide if a sample should be kept based on the scores of\n multiple videos.", "arguments": " hf_scorer_model (): Huggingface model name for the aesthetics\n predictor. By default, we will use\n 'shunk031/aesthetics-predictor-v2-sac-logos-ava1-l14-linearMSE',\n refer to pypi.org/project/simple-aesthetics-predictor\n trust_remote_code ()\n min_score (): Min score for the predicted aesthetics in a video.\n max_score (): Max score for the predicted aesthetics in a video.\n frame_sampling_method (): sampling method of extracting frame\n images from the videos.\n Should be one of [\"all_keyframes\", \"uniform\"].\n The former one extracts all key frames and the latter one extract\n specified number of frames uniformly from the video.\n Default: \"uniform\" with frame_num=3, considering that the number of\n keyframes can be large while their difference is usually small\n in terms of their aesthetics.\n frame_num (typing.Annotated[int, Gt(gt=0)]): the number of frames to be extracted uniformly from\n the video. Only works when frame_sampling_method is \"uniform\". If\n it's 1, only the middle frame will be extracted. If it's 2, only\n the first and the last frames will be extracted. If it's larger\n than 2, in addition to the first and the last frames, other frames\n will be extracted uniformly within the video duration.\n any_or_all (): Keep this sample with 'any' or 'all' strategy of\n all videos. 'any': keep this sample if any videos meet the\n condition. 'all': keep this sample only if all videos meet the\n condition.\n reduce_mode (): reduce mode when one sample corresponds to\n multiple frames, must be one of ['avg','max', 'min'].\n 'avg': Take the average of multiple values\n 'max': Take the max of multiple values\n 'min': Take the min of multiple values\n"}, {"index": 56, "class_name": "video_aspect_ratio_filter", "class_desc": "Filter to keep samples with video aspect ratio within a specific range.\n\n This operator filters samples based on the aspect ratios of their videos. It keeps\n samples where the video aspect ratios fall within a specified range. The aspect ratio is\n calculated as the width divided by the height (W / H). The operator supports two\n strategies for keeping samples: 'any' and 'all'. In 'any' mode, a sample is kept if at\n least one video meets the aspect ratio condition. In 'all' mode, all videos in the\n sample must meet the condition for the sample to be kept. The aspect ratios are computed\n and stored in the 'video_aspect_ratios' field of the sample's stats.", "arguments": " min_ratio (): The minimum aspect ratio to keep samples,\n supported format is a string, such as \"9:21\" or \"9/21\".\n max_ratio (): The maximum aspect ratio to keep samples,\n supported format is a string, such as \"21:9\" or \"21/9\".\n any_or_all (): keep this sample with 'any' or 'all' strategy of\n all videos. 'any': keep this sample if any videos meet the\n condition. 'all': keep this sample only if all videos meet the\n condition.\n"}, {"index": 57, "class_name": "video_duration_filter", "class_desc": "Keep data samples whose videos' durations are within a specified range.\n\n This operator filters data samples based on the duration of their associated videos. It\n keeps samples where the video durations fall within a specified minimum and maximum\n range. The filtering strategy can be set to 'any' or 'all':\n - 'any': Keep the sample if any of its videos meet the duration criteria.\n - 'all': Keep the sample only if all of its videos meet the duration criteria.\n The video durations are computed and stored in the 'video_duration' field of the\n sample's stats. If no videos are present, an empty array is stored.", "arguments": " min_duration (): The min video duration to keep samples in seconds.\n It's 0 by default.\n max_duration (): The max video duration to keep samples in seconds.\n It's sys.maxsize by default.\n any_or_all (): keep this sample with 'any' or 'all' strategy of\n all videos. 'any': keep this sample if any videos meet the\n condition. 'all': keep this sample only if all videos meet the\n condition.\n"}, {"index": 58, "class_name": "video_frames_text_similarity_filter", "class_desc": "Filter to keep samples based on the similarity between video frame images and text\n within a specific range.\n\n This operator uses a Hugging Face CLIP model to compute the similarity between video\n frames and associated text. It keeps samples where the computed similarity scores fall\n within a specified range. The operator supports different frame sampling methods,\n including 'all_keyframes' and 'uniform', and allows for horizontal and vertical flipping\n of the frames. The similarity score is reduced using one of three modes: 'avg', 'max',\n or 'min'. The operator also supports two strategies for keeping samples: 'any' (keep if\n any video meets the condition) or 'all' (keep only if all videos meet the condition).\n The key metric is stored in the 'video_frames_text_similarity' field.", "arguments": " hf_clip (): clip model name on huggingface to compute\n the similarity between frame image and text. It's kind of\n language-related. For example, for Chinese datasets, ChineseCLIP\n might be a better choice.\n trust_remote_code ()\n min_score (): the min similarity to keep samples.\n max_score (): the max similarity to keep samples.\n frame_sampling_method (): sampling method of extracting frame\n images from the videos.\n Should be one of [\"all_keyframes\", \"uniform\"].\n The former one extracts all key frames (the number of which depends\n on the duration of the video) and the latter one extract specified\n number of frames uniformly from the video.\n Default: \"all_keyframes\".\n frame_num (typing.Annotated[int, Gt(gt=0)]): the number of frames to be extracted uniformly from\n the video. Only works when frame_sampling_method is \"uniform\". If\n it's 1, only the middle frame will be extracted. If it's 2, only\n the first and the last frames will be extracted. If it's larger\n than 2, in addition to the first and the last frames, other frames\n will be extracted uniformly within the video duration.\n horizontal_flip (): flip frame image horizontally (left to right).\n vertical_flip (): flip frame image vertically (top to bottom).\n any_or_all (): keep this sample with 'any' or 'all' strategy of\n all videos. 'any': keep this sample if any videos meet the\n condition. 'all': keep this sample only if all videos meet the\n condition.\n reduce_mode (): reduce mode when one text corresponds to\n multiple video frame images in a chunk.\n 'avg': Take the average of multiple values\n 'max': Take the max of multiple values\n 'min': Take the min of multiple values\n"}, {"index": 59, "class_name": "video_motion_score_filter", "class_desc": "Filter to keep samples with video motion scores within a specific range.\n\n The operator uses Farneback's algorithm from OpenCV to compute dense optical flow. It\n calculates the average motion score for each video and retains samples based on the\n specified minimum and maximum score thresholds. The 'any' or 'all' strategy determines\n whether to keep a sample if any or all videos meet the criteria. The motion score is\n computed as the mean magnitude of the optical flow, which can be normalized relative to\n the frame's diagonal length. The stats are cached under the key 'video_motion_score'.", "arguments": " min_score (): The minimum motion score to keep samples.\n max_score (): The maximum motion score to keep samples.\n sampling_fps (typing.Annotated[float, Gt(gt=0)]): The sampling rate in frames_per_second for\n optical flow calculations.\n size (typing.Union[typing.Annotated[int, Gt(gt=0)], typing.Tuple[typing.Annotated[int, Gt(gt=0)]], typing.Tuple[typing.Annotated[int, Gt(gt=0)], typing.Annotated[int, Gt(gt=0)]], NoneType]): Resize frames before computing optical flow. If size is a\n sequence like (h, w), frame size will be matched to this. If size\n is an int, smaller edge of frames will be matched to this number.\n i.e, if height > width, then frame will be rescaled to (size *\n height / width, size). Default `None` to keep the original size.\n max_size (typing.Optional[typing.Annotated[int, Gt(gt=0)]]): The maximum allowed for the longer edge of resized\n frames. If the longer edge of frames is greater than max_size after\n being resized according to size, size will be overruled so that the\n longer edge is equal to max_size. As a result, the smaller edge may\n be shorter than size. This is only supported if size is an int.\n divisible (typing.Annotated[int, Gt(gt=0)]): The number that the dimensions must be divisible by.\n relative (): If `True`, the optical flow magnitude is normalized to\n a [0, 1] range, relative to the frame's diagonal length.\n any_or_all (): keep this sample with 'any' or 'all' strategy of\n all videos. 'any': keep this sample if any videos meet the\n condition. 'all': keep this sample only if all videos meet the\n condition.\n"}, {"index": 60, "class_name": "video_motion_score_raft_filter", "class_desc": "Filter to keep samples with video motion scores within a specified range.\n\n This operator utilizes the RAFT (Recurrent All-Pairs Field Transforms) model from\n torchvision to predict optical flow between video frames. It keeps samples where the\n video motion score is within the given min and max score range. The motion score is\n computed based on the optical flow between frames, which is estimated using the RAFT\n model. The operator can sample frames at a specified FPS and apply transformations to\n the frames before computing the flow.\n\n - The RAFT model is used to estimate the optical flow.\n - Frames are preprocessed using a series of transformations including normalization and\n color channel flipping.\n - The motion score is calculated from the optical flow data.\n - The operator can be configured to filter based on any or all frames in the video.\n - The device for model inference (CPU or CUDA) is automatically detected and set.\n\n For further details, refer to the official torchvision documentation:\n https://pytorch.org/vision/main/models/raft.html\n\n The original paper on RAFT is available here:\n https://arxiv.org/abs/2003.12039\n ", "arguments": " min_score ()\n max_score ()\n sampling_fps (typing.Annotated[float, Gt(gt=0)])\n size (typing.Union[typing.Annotated[int, Gt(gt=0)], typing.Tuple[typing.Annotated[int, Gt(gt=0)]], typing.Tuple[typing.Annotated[int, Gt(gt=0)], typing.Annotated[int, Gt(gt=0)]], NoneType])\n max_size (typing.Optional[typing.Annotated[int, Gt(gt=0)]])\n divisible (typing.Annotated[int, Gt(gt=0)])\n relative ()\n any_or_all ()\n"}, {"index": 61, "class_name": "video_nsfw_filter", "class_desc": "Filter to keep samples whose videos have nsfw scores in a specified range.\n\n This operator uses a Hugging Face model to detect NSFW content in video frames. It keeps\n samples where the NSFW score is below a specified threshold. The operator supports two\n frame sampling methods: \"all_keyframes\" and \"uniform\". For \"uniform\", it extracts a\n specified number of frames. The NSFW scores are reduced using one of three modes: \"avg\",\n \"max\", or \"min\". The key metric, 'video_nsfw_score', is computed for each video and\n stored in the sample's stats. The operator can use either an \"any\" or \"all\" strategy to\n decide if a sample should be kept based on the NSFW scores of its videos.", "arguments": " hf_nsfw_model (): nsfw detection model name on huggingface.\n trust_remote_code ()\n min_score ()\n max_score (): the nsfw score threshold for samples.\n range from 0 to 1. Samples with nsfw score less than this threshold\n will be kept.\n frame_sampling_method (): sampling method of extracting frame\n images from the videos.\n Should be one of [\"all_keyframes\", \"uniform\"].\n The former one extracts all key frames (the number of which depends\n on the duration of the video) and the latter one extract specified\n number of frames uniformly from the video.\n Default: \"all_keyframes\".\n frame_num (typing.Annotated[int, Gt(gt=0)]): the number of frames to be extracted uniformly from\n the video. Only works when frame_sampling_method is \"uniform\". If\n it's 1, only the middle frame will be extracted. If it's 2, only\n the first and the last frames will be extracted. If it's larger\n than 2, in addition to the first and the last frames, other frames\n will be extracted uniformly within the video duration.\n reduce_mode (): reduce mode for multiple sampled video frames.\n 'avg': Take the average of multiple values\n 'max': Take the max of multiple values\n 'min': Take the min of multiple values\n any_or_all (): keep this sample with 'any' or 'all' strategy of\n all videos. 'any': keep this sample if any videos meet the\n condition. 'all': keep this sample only if all videos meet the\n condition.\n"}, {"index": 62, "class_name": "video_ocr_area_ratio_filter", "class_desc": "Keep data samples whose detected text area ratios for specified frames in the video are\n within a specified range.\n\n This operator filters data based on the ratio of the detected text area to the total\n frame area. It uses EasyOCR to detect text in the specified languages and calculates the\n area ratio for each sampled frame. The operator then determines whether to keep a sample\n based on the `any` or `all` strategy, which checks if any or all of the videos meet the\n specified area ratio range. The key metric, `video_ocr_area_ratio`, is computed as the\n mean of the text area ratios across the sampled frames. The number of sampled frames and\n the specific frames to be sampled can be configured.", "arguments": " min_area_ratio (): The min ocr area ratio to keep samples. It's 0\n by default.\n max_area_ratio (): The max ocr area ratio to keep samples. It's 1.0\n by default.\n frame_sample_num (typing.Annotated[int, Gt(gt=0)]): The number of sampled frames to calculate the\n ocr area ratio. If it's 1, only middle frame will be selected. If\n it's 2, only the first and the last frames will be selected. If\n it's larger than 2, in addition to the first and the last frames,\n other frames will be sampled evenly within the video duration.\n languages_to_detect (typing.Union[str, typing.List[str]]): texts in which languages should be\n detected. Default: ['ch_sim', 'en']. Full language list can be\n found here: https://www.jaided.ai/easyocr/.\n any_or_all (): keep this sample with 'any' or 'all' strategy of\n all videos. 'any': keep this sample if any videos meet the\n condition. 'all': keep this sample only if all videos meet the\n condition.\n"}, {"index": 63, "class_name": "video_resolution_filter", "class_desc": "Keep data samples whose videos' resolutions are within a specified range.\n\n This operator filters data samples based on the resolution of the videos they contain.\n It keeps samples if the video resolutions fall within the defined width and height\n ranges. The filtering strategy can be set to 'any' or 'all':\n - 'any': Keeps the sample if any video meets the resolution criteria.\n - 'all': Keeps the sample only if all videos meet the resolution criteria.\n\n The operator computes and caches the 'video_width' and 'video_height' for each video in\n the sample. If no videos are present, it sets these fields to empty arrays. These cached\n values are used to determine whether to keep or filter out the sample.", "arguments": " min_width (): The min horizontal resolution.\n max_width (): The max horizontal resolution.\n min_height (): The min vertical resolution.\n max_height (): The max vertical resolution.\n any_or_all (): keep this sample with 'any' or 'all' strategy of\n all videos. 'any': keep this sample if any videos meet the\n condition. 'all': keep this sample only if all videos meet the\n condition.\n"}, {"index": 64, "class_name": "human_preference_annotation_mapper", "class_desc": "Operator for human preference annotation using Label Studio.\n\n This operator formats and presents pairs of answers to a prompt for human evaluation. It\n uses a default or custom Label Studio configuration to display the prompt and answer\n options. The operator processes the annotations to determine the preferred answer,\n updating the sample with the chosen and rejected answers. The operator requires specific\n keys in the samples for the prompt and answer options. If these keys are missing, it\n logs warnings and uses placeholder text. The annotated results are processed to update\n the sample with the chosen and rejected answers.", "arguments": " label_config_file (): Path to the label config file\n answer1_key (): Key for the first answer\n answer2_key (): Key for the second answer\n prompt_key (): Key for the prompt/question\n chosen_key (): Key for the chosen answer\n rejected_key (): Key for the rejected answer\n"}, {"index": 65, "class_name": "audio_add_gaussian_noise_mapper", "class_desc": "Mapper to add Gaussian noise to audio samples.\n\n This operator adds Gaussian noise to audio data with a specified probability. The\n amplitude of the noise is randomly chosen between `min_amplitude` and `max_amplitude`.\n If `save_dir` is provided, the modified audio files are saved in that directory;\n otherwise, they are saved in the same directory as the input files. The `p` parameter\n controls the probability of applying this transformation to each sample. If no audio is\n present in the sample, it is returned unchanged.", "arguments": " min_amplitude (): float unit: linear amplitude.\n Default: 0.001. Minimum noise amplification factor.\n max_amplitude (): float unit: linear amplitude.\n Default: 0.015. Maximum noise amplification factor.\n p (): float range: [0.0, 1.0]. Default: 0.5.\n The probability of applying this transform.\n save_dir: str. Default: None.\n The directory where generated audio files will be stored.\n If not specified, outputs will be saved in the same directory as their corresponding input files.\n This path can alternatively be defined by setting the `DJ_PRODUCED_DATA_DIR` environment variable.\n save_dir ()\n"}, {"index": 66, "class_name": "audio_ffmpeg_wrapped_mapper", "class_desc": "Wraps FFmpeg audio filters for processing audio files in a dataset.\n\n This operator applies specified FFmpeg audio filters to the audio files in the dataset.\n It supports passing custom filter parameters and global arguments to the FFmpeg command\n line. The processed audio files are saved to a specified directory or the same directory\n as the input files if no save directory is provided. The `DJ_PRODUCED_DATA_DIR`\n environment variable can also be used to set the save directory. If no filter name is\n provided, the audio files remain unmodified. The operator updates the source file paths\n in the dataset after processing.", "arguments": " filter_name (typing.Optional[str]): ffmpeg audio filter name.\n filter_kwargs (typing.Optional[typing.Dict]): keyword-arguments passed to ffmpeg filter.\n global_args (typing.Optional[typing.List[str]]): list-arguments passed to ffmpeg command-line.\n capture_stderr (): whether to capture stderr.\n overwrite_output (): whether to overwrite output file.\n save_dir (): The directory where generated audio files will be stored.\n If not specified, outputs will be saved in the same directory as their corresponding input files.\n This path can alternatively be defined by setting the `DJ_PRODUCED_DATA_DIR` environment variable.\n"}, {"index": 67, "class_name": "calibrate_qa_mapper", "class_desc": "Calibrates question-answer pairs based on reference text using an API model.\n\n This operator uses a specified API model to calibrate question-answer pairs, making them\n more detailed and accurate. It constructs the input prompt by combining the reference\n text and the question-answer pair, then sends it to the API for calibration. The output\n is parsed to extract the calibrated question and answer. The operator retries the API\n call and parsing up to a specified number of times in case of errors. The default system\n prompt, input templates, and output pattern can be customized. The operator supports\n additional parameters for model initialization and sampling.", "arguments": " api_model (): API model name.\n api_endpoint (typing.Optional[str]): URL endpoint for the API.\n response_path (typing.Optional[str]): Path to extract content from the API response.\n Defaults to 'choices.0.message.content'.\n system_prompt (typing.Optional[str]): System prompt for the calibration task.\n input_template (typing.Optional[str]): Template for building the model input.\n reference_template (typing.Optional[str]): Template for formatting the reference text.\n qa_pair_template (typing.Optional[str]): Template for formatting question-answer pairs.\n output_pattern (typing.Optional[str]): Regular expression for parsing model output.\n try_num (typing.Annotated[int, Gt(gt=0)]): The number of retry attempts when there is an API\n call error or output parsing error.\n model_params (typing.Dict): Parameters for initializing the API model.\n sampling_params (typing.Dict): Extra parameters passed to the API call.\n e.g {'temperature': 0.9, 'top_p': 0.95}\n"}, {"index": 68, "class_name": "calibrate_query_mapper", "class_desc": "Calibrate query in question-answer pairs based on reference text.\n\n This operator adjusts the query (question) in a question-answer pair to be more detailed\n and accurate, while ensuring it can still be answered by the original answer. It uses a\n reference text to inform the calibration process. The calibration is guided by a system\n prompt, which instructs the model to refine the question without adding extraneous\n information. The output is parsed to extract the calibrated query, with any additional\n content removed.", "arguments": " api_model (): API model name.\n api_endpoint (typing.Optional[str]): URL endpoint for the API.\n response_path (typing.Optional[str]): Path to extract content from the API response.\n Defaults to 'choices.0.message.content'.\n system_prompt (typing.Optional[str]): System prompt for the calibration task.\n input_template (typing.Optional[str]): Template for building the model input.\n reference_template (typing.Optional[str]): Template for formatting the reference text.\n qa_pair_template (typing.Optional[str]): Template for formatting question-answer pairs.\n output_pattern (typing.Optional[str]): Regular expression for parsing model output.\n try_num (typing.Annotated[int, Gt(gt=0)]): The number of retry attempts when there is an API\n call error or output parsing error.\n model_params (typing.Dict): Parameters for initializing the API model.\n sampling_params (typing.Dict): Extra parameters passed to the API call.\n e.g {'temperature': 0.9, 'top_p': 0.95}\n"}, {"index": 69, "class_name": "calibrate_response_mapper", "class_desc": "Calibrate response in question-answer pairs based on reference text.\n\n This mapper calibrates the 'response' part of a question-answer pair by using a\n reference text. It aims to make the response more detailed and accurate while ensuring\n it still answers the original question. The calibration process uses a default system\n prompt, which can be customized. The output is stripped of any leading or trailing\n whitespace.", "arguments": " api_model (): API model name.\n api_endpoint (typing.Optional[str]): URL endpoint for the API.\n response_path (typing.Optional[str]): Path to extract content from the API response.\n Defaults to 'choices.0.message.content'.\n system_prompt (typing.Optional[str]): System prompt for the calibration task.\n input_template (typing.Optional[str]): Template for building the model input.\n reference_template (typing.Optional[str]): Template for formatting the reference text.\n qa_pair_template (typing.Optional[str]): Template for formatting question-answer pairs.\n output_pattern (typing.Optional[str]): Regular expression for parsing model output.\n try_num (typing.Annotated[int, Gt(gt=0)]): The number of retry attempts when there is an API\n call error or output parsing error.\n model_params (typing.Dict): Parameters for initializing the API model.\n sampling_params (typing.Dict): Extra parameters passed to the API call.\n e.g {'temperature': 0.9, 'top_p': 0.95}\n"}, {"index": 70, "class_name": "chinese_convert_mapper", "class_desc": "Mapper to convert Chinese text between Traditional, Simplified, and Japanese Kanji.\n\n This operator converts Chinese text based on the specified mode. It supports conversions\n between Simplified Chinese, Traditional Chinese (including Taiwan and Hong Kong\n variants), and Japanese Kanji. The conversion is performed using a pre-defined set of\n rules. The available modes include 's2t' for Simplified to Traditional, 't2s' for\n Traditional to Simplified, and other specific variants like 's2tw', 'tw2s', 's2hk',\n 'hk2s', 's2twp', 'tw2sp', 't2tw', 'tw2t', 'hk2t', 't2hk', 't2jp', and 'jp2t'. The\n operator processes text in batches and applies the conversion to the specified text key\n in the samples.", "arguments": " mode (