diff --git a/alias/README.md b/alias/README.md index d4ec513..b7640f0 100644 --- a/alias/README.md +++ b/alias/README.md @@ -371,6 +371,85 @@ After the first startup, you can log in with the superuser credentials configure - **Password**: As specified in `FIRST_SUPERUSER_PASSWORD` +### 🌐 Basic Usage -- AgentScope Runtime Deployment + +Alias is now fully compatible with [AgentScope Runtime](https://github.com/agentscope-ai/agentscope-runtime/), enabling you to quickly deploy Alias as a standardized backend service. Once launched, you can easily invoke Alias capabilities via the accompanying AgentScope Runtime API. + +#### 1. Prerequisites + +* **Sandbox & API Keys**: Please refer to the previous sections [🐳 Sandbox Setup (Optional)](#-sandbox-setup-optional) and [🔑 API Keys Configuration](#-api-keys-configuration) to complete the basic environment setup. +* **Environment Variables**: Copy the example environment file from the project root: + ```bash + cp .env.example .env + ``` +* **Start Redis**: Required for caching and session management: + ```bash + docker run -d -p 6379:6379 --name alias-redis redis:7-alpine + ``` + +#### 2. Installation & Sandbox Launch + +Install the package in editable mode from the project root. This will automatically install the `alias_agent_runtime` CLI tool: +```bash +pip install -e . +``` + +To ensure proper code execution and file operations, start the sandbox server in a separate terminal: +```bash +runtime-sandbox-server --extension src/alias/runtime/alias_sandbox/alias_sandbox.py +``` + +#### 3. Launching AgentScope Runtime Service + +You can choose to start the service via the CLI or Python code, depending on your use case. + +##### Option A: Using CLI (Recommended) +Use the `alias_agent_runtime` command to launch the backend service with one click: + +```bash +alias_agent_runtime --host 127.0.0.1 --port 8090 --chat-mode general +``` + +**Parameter Descriptions**: +* `--host` / `--port`: Specify the service address and port (default port is 8090). +* `--chat-mode`: Set the running mode. Options: `general`, `dr`, `browser`, `ds`, `finance` (default: `general`). +* `--web-ui`: (Optional) Enable AgentScope Runtime WebUI for a visual interaction interface. Skip this if you only need the API. + +> **Note**: When enabling `--web-ui` for the first time, the system will automatically install necessary frontend dependencies. This may take a few minutes. + +##### Option B: Using Python Code (Recommended for Developers) +If you wish to integrate or customize the launch logic within Python, you can use `AliasRunner` and `AgentApp` as shown below: + +```python +from agentscope_runtime.engine.app import AgentApp +from alias.server.runtime.runner.alias_runner import AliasRunner + +# 1. Initialize AliasRunner +# default_chat_mode options: "general", "dr", "browser", "ds", "finance" +runner = AliasRunner( + default_chat_mode="general", +) + +# 2. Create AgentApp instance +agent_app = AgentApp( + runner=runner, + app_name="Alias", + app_description="An LLM-empowered agent built on AgentScope and AgentScope-Runtime", +) + +# 3. Run the service +# Set web_ui=True to enable the visual debugging interface +agent_app.run(host="127.0.0.1", port=8090) +``` + +#### 4. Accessing the Application + +Once the service is running, you can access Alias via: + +* **Runtime API Access**: Send standard HTTP POST requests to `http://localhost:8090/process`. This is the primary method for integrating Alias into third-party frontends or backend workflows. +* **Visual Monitoring (Optional)**: If started with the `--web-ui` flag, visit `http://localhost:5173`. This interface allows developers to observe the agent's reasoning process, tool execution traces, and other debugging information. + + ## ⚖️ License Alias-Agent is released under the **Apache 2.0 License** – see the [LICENSE](https://github.com/agentscope-ai/agentscope-samples/blob/main/LICENSE) file for details. diff --git a/alias/README_ZH.md b/alias/README_ZH.md index a3ff850..f2bc6d0 100644 --- a/alias/README_ZH.md +++ b/alias/README_ZH.md @@ -371,6 +371,83 @@ bash script/start_memory_service.sh - **用户名**:如 `FIRST_SUPERUSER_USERNAME` 所指定 (默认: `alias`) - **密码**:如 `FIRST_SUPERUSER_PASSWORD` 所指定 +### 🌐 基础用法 -- AgentScope Runtime 部署 + +Alias 现已适配 [AgentScope Runtime](https://github.com/agentscope-ai/agentscope-runtime/),您可以利用 AgentScope Runtime 将 Alias 快速部署为标准后端服务。启动后,通过配套的 AgentScope Runtime API 即可轻松调用 Alias 所提供的服务。 + +#### 1. 前期准备 + +* **沙盒设置与 API 密钥**:请参考前文的 [🐳 沙盒设置](#-沙盒设置可选) 和 [🔑 API 密钥配置](#-api-密钥配置) 完成基础环境配置。 +* **配置环境变量**:从项目根目录复制示例环境文件: + ```bash + cp .env.example .env + ``` +* **启动 Redis**:缓存和会话管理所需: + ```bash + docker run -d -p 6379:6379 --name alias-redis redis:7-alpine + ``` + +#### 2. 安装与沙盒启动 + +在项目根目录下,以开发模式安装包,这将自动安装 `alias_agent_runtime` 命令行工具: +```bash +pip install -e . +``` + +为了确保代码执行和文件操作等功能正常,请在另一个终端启动沙盒服务器: +```bash +runtime-sandbox-server --extension src/alias/runtime/alias_sandbox/alias_sandbox.py +``` + +#### 3. 启动 AgentScope Runtime 服务 + +您可以根据使用场景,选择通过命令行或 Python 代码启动服务。 + +##### 选项 A:使用命令行工具(推荐) +使用 `alias_agent_runtime` 命令一键启动后端服务: + +```bash +alias_agent_runtime --host 127.0.0.1 --port 8090 --chat-mode general +``` + +**参数说明**: +* `--host` / `--port`: 指定服务的运行地址和端口(默认端口为 8090)。 +* `--chat-mode`: 设置运行模式,可选 `general`, `dr`, `browser`, `ds`, `finance`(默认为 `general`)。 +* `--web-ui` : (可选) 启用 AgentScope Runtime WebUI 以开启可视化交互界面。若仅需调用 API,请忽略此参数。 + +> **注意**:首次启动并开启 `--web-ui` 时,系统会自动安装必要的前端依赖包,可能需要花费几分钟时间,请耐心等待。 + +##### 选项 B:使用代码启动(开发者推荐) +如果您希望在 Python 代码中集成或自定义启动逻辑,可以参考以下示例,结合 `AliasRunner` 和 `AgentApp`: + +```python +from agentscope_runtime.engine.app import AgentApp +from alias.server.runtime.runner.alias_runner import AliasRunner + +# 1. 初始化 AliasRunner +# default_chat_mode 可选: "general", "dr", "browser", "ds", "finance" +runner = AliasRunner( + default_chat_mode="general", +) + +# 2. 创建 AgentApp 实例 +agent_app = AgentApp( + runner=runner, + app_name="Alias", + app_description="An LLM-empowered agent built on AgentScope and AgentScope-Runtime", +) + +# 3. 运行服务 +# 如需启用可视化调试界面,可设置 web_ui=True +agent_app.run(host="127.0.0.1", port=8090) +``` + +#### 4. 访问应用程序 + +服务启动后,您可以通过以下方式访问 Alias: + +* **Runtime API 调用**:通过标准 HTTP POST 请求访问 `http://localhost:8090/process`。这是将 Alias 集成至第三方前端或后端工作流的主要方式。 +* **可视化监控 (可选)**:若启动时开启了 `--web-ui` 参数,可通过 `http://localhost:5173` 访问 WebUI。该界面主要用于开发者观察智能体的思考过程以及工具调用轨迹等调试信息。 ## ⚖️ 许可证 diff --git a/alias/pyproject.toml b/alias/pyproject.toml index e9a1a28..9c00abd 100644 --- a/alias/pyproject.toml +++ b/alias/pyproject.toml @@ -64,4 +64,5 @@ dev = [ ] [project.scripts] -alias_agent = "alias.cli:main" \ No newline at end of file +alias_agent = "alias.cli:main" +alias_agent_runtime = "alias.server.alias_agent_app:main" diff --git a/alias/src/alias/runtime/__init__.py b/alias/src/alias/runtime/__init__.py index c508444..de9ef7a 100644 --- a/alias/src/alias/runtime/__init__.py +++ b/alias/src/alias/runtime/__init__.py @@ -1,7 +1,8 @@ # -*- coding: utf-8 -*- """Runtime module for Alias""" -__all__ = ["alias_sandbox"] +__all__ = ["alias_sandbox", "runtime_compat"] # Import submodule to make it accessible via alias.runtime.alias_sandbox from . import alias_sandbox # noqa: E402, F401 +from . import runtime_compat # noqa: E402, F401 diff --git a/alias/src/alias/runtime/runtime_compat/__init__.py b/alias/src/alias/runtime/runtime_compat/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/alias/src/alias/runtime/runtime_compat/adapter/__init__.py b/alias/src/alias/runtime/runtime_compat/adapter/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/alias/src/alias/runtime/runtime_compat/adapter/alias_stream_adapter.py b/alias/src/alias/runtime/runtime_compat/adapter/alias_stream_adapter.py new file mode 100644 index 0000000..ac1bf76 --- /dev/null +++ b/alias/src/alias/runtime/runtime_compat/adapter/alias_stream_adapter.py @@ -0,0 +1,203 @@ +# -*- coding: utf-8 -*- +import json +from typing import Any, AsyncIterator, Dict, Optional, Union + +from agentscope_runtime.engine.helpers.agent_api_builder import ResponseBuilder +from agentscope_runtime.engine.schemas.agent_schemas import ( + Content, + ContentType, + FunctionCall, + FunctionCallOutput, + Message, + MessageType, + Role, +) + + +def _try_deep_parse(val: Any) -> Any: + """ + Recursively parse JSON-like strings into native Python objects. + """ + if isinstance(val, str): + content = val.strip() + if (content.startswith("{") and content.endswith("}")) or ( + content.startswith("[") and content.endswith("]") + ): + try: + parsed = json.loads(content) + return _try_deep_parse(parsed) + except Exception: + # If nested JSON parsing fails, treat it as a normal string. + return val + return val + if isinstance(val, list): + return [_try_deep_parse(i) for i in val] + if isinstance(val, dict): + return {k: _try_deep_parse(v) for k, v in val.items()} + return val + + +def _ensure_safe_json_string(val: Any) -> str: + """ + Serialize content into a valid JSON string suitable for WebUI parsing. + """ + parsed_val = _try_deep_parse(val) + if parsed_val is None: + return "{}" + return json.dumps(parsed_val, ensure_ascii=False) + + +def _extract_alias_output_obj(content_str: str) -> Any: + """ + Extract the `output` object from Alias nested tool-result content. + """ + try: + data = json.loads(content_str) + if isinstance(data, list) and data: + return data[0].get("output") + except Exception: + # Best-effort parse: if the string is not a valid + # JSON or doesn't follow the expected structure, + # fall back to returning the original string. + pass + return content_str + + +class AliasAdapterState: + def __init__( + self, + message_builder: Any, + content_builder: Any, + runtime_type: str, + ): + self.mb = message_builder + self.cb = content_builder + self.runtime_type = runtime_type + self.last_content = "" + self.is_completed = False + + +async def adapt_alias_message_stream( + source_stream: AsyncIterator[Dict[str, Any]], +) -> AsyncIterator[Union[Message, Content]]: + # pylint: disable=too-many-branches, too-many-statements + rb = ResponseBuilder() + state_map: Dict[str, AliasAdapterState] = {} + last_active_key: Optional[str] = None + + yield rb.created() + yield rb.in_progress() + + async for chunk in source_stream: + if not isinstance(chunk, dict) or "data" not in chunk: + continue + + messages = chunk["data"].get("messages") or [] + for item in messages: + alias_id = item.get("id") + inner_msg = item.get("message") or {} + + alias_type = inner_msg.get("type") + alias_status = inner_msg.get("status") + tool_call_id = inner_msg.get("tool_call_id") or alias_id + + if alias_type in ["thought", "sub_thought"]: + runtime_type = MessageType.REASONING + target_role = Role.ASSISTANT + elif alias_type in ["tool_call", "tool_use"]: + runtime_type = MessageType.PLUGIN_CALL + target_role = Role.ASSISTANT + elif alias_type == "tool_result": + runtime_type = MessageType.PLUGIN_CALL_OUTPUT + target_role = Role.TOOL + else: + runtime_type = MessageType.MESSAGE + target_role = Role.ASSISTANT + + state_key = f"{tool_call_id}_{runtime_type}" + + if last_active_key and last_active_key != state_key: + old_state = state_map.get(last_active_key) + if old_state and not old_state.is_completed: + yield old_state.cb.complete() + yield old_state.mb.complete() + old_state.is_completed = True + + last_active_key = state_key + + if state_key not in state_map: + mb = rb.create_message_builder(role=target_role) + mb.message.type = runtime_type + yield mb.get_message_data() + + if runtime_type in [ + MessageType.PLUGIN_CALL, + MessageType.PLUGIN_CALL_OUTPUT, + ]: + c_type = ContentType.DATA + else: + c_type = ContentType.TEXT + + cb = mb.create_content_builder(content_type=c_type) + state_map[state_key] = AliasAdapterState(mb, cb, runtime_type) + + state = state_map[state_key] + + if runtime_type in [MessageType.MESSAGE, MessageType.REASONING]: + raw_text = str(inner_msg.get("content") or "") + + if alias_type == "files" and "files" in inner_msg: + raw_text = "\n".join( + [ + f"📁 [{f['filename']}]({f['url']})" + for f in inner_msg["files"] + ], + ) + + if raw_text.startswith(state.last_content): + delta = raw_text[len(state.last_content) :] + if delta: + yield state.cb.add_text_delta(delta) + state.last_content = raw_text + else: + yield state.cb.set_text(raw_text) + state.last_content = raw_text + + elif runtime_type == MessageType.PLUGIN_CALL: + args = inner_msg.get("arguments") or {} + fc = FunctionCall( + call_id=tool_call_id, + name=inner_msg.get("tool_name") or "tool", + arguments=_ensure_safe_json_string(args), + ) + yield state.cb.set_data(fc.model_dump()) + + elif runtime_type == MessageType.PLUGIN_CALL_OUTPUT: + output_obj = _extract_alias_output_obj( + inner_msg.get("content", ""), + ) + fco = FunctionCallOutput( + call_id=tool_call_id, + name=inner_msg.get("tool_name") or "tool", + output=_ensure_safe_json_string(output_obj), + ) + yield state.cb.set_data(fco.model_dump()) + + if alias_status == "finished" and not state.is_completed: + yield state.cb.complete() + yield state.mb.complete() + state.is_completed = True + + for state in state_map.values(): + if not state.is_completed: + try: + yield state.cb.complete() + yield state.mb.complete() + state.is_completed = True + except Exception: + # Graceful cleanup: ignore errors during the + # finalization phase to ensure the main response + # stream can finish without crashing. + pass + + yield rb.completed() diff --git a/alias/src/alias/runtime/runtime_compat/runner/__init__.py b/alias/src/alias/runtime/runtime_compat/runner/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/alias/src/alias/runtime/runtime_compat/runner/alias_runner.py b/alias/src/alias/runtime/runtime_compat/runner/alias_runner.py new file mode 100644 index 0000000..639bf85 --- /dev/null +++ b/alias/src/alias/runtime/runtime_compat/runner/alias_runner.py @@ -0,0 +1,371 @@ +# -*- coding: utf-8 -*- +# pylint: disable=unused-argument +from __future__ import annotations + +import asyncio +import uuid +from typing import Any, AsyncGenerator, Dict, Optional, Union + +from fastapi_limiter import FastAPILimiter +from pydantic import ValidationError + +from agentscope_runtime.engine.runner import Runner +from agentscope_runtime.engine.schemas.agent_schemas import ( + AgentRequest, + AgentResponse, + Error, + RunStatus, + SequenceNumberGenerator, +) + +from alias.server.db.init_db import ( + close_database, + initialize_database, + session_scope, +) +from alias.server.core.task_manager import task_manager +from alias.server.exceptions.base import BaseError +from alias.runtime.runtime_compat.adapter.alias_stream_adapter import ( + adapt_alias_message_stream, +) +from alias.server.schemas.chat import ChatRequest +from alias.server.services.chat_service import ChatService +from alias.server.services.conversation_service import ConversationService +from alias.server.utils.logger import setup_logger +from alias.server.utils.redis import redis_client + + +class AliasRunner(Runner): + FRAMEWORK_TYPE = "Alias" + + def __init__( + self, + default_chat_mode: str = "general", + default_conv_name: str = "webui", + ) -> None: + super().__init__() + self.framework_type = self.FRAMEWORK_TYPE + self.default_chat_mode = default_chat_mode + self.default_conv_name = default_conv_name + + self._session_conv_cache: Dict[str, uuid.UUID] = {} + + async def stop(self) -> None: + if not getattr(self, "_health", False): + return + await super().stop() + + async def query_handler(self, *args: Any, **kwargs: Any) -> Any: + user_id: uuid.UUID = kwargs["user_id"] + conversation_id: uuid.UUID = kwargs["conversation_id"] + chat_request: ChatRequest = kwargs["chat_request"] + task_id: uuid.UUID = kwargs.get("task_id") or uuid.uuid4() + + service = ChatService() + response_gen = await service.chat( + user_id=user_id, + conversation_id=conversation_id, + chat_request=chat_request, + task_id=task_id, + ) + return response_gen + + async def init_handler(self, *args: Any, **kwargs: Any) -> None: + print("🚀 Starting Alias API Server...") + setup_logger() + + await initialize_database() + await task_manager.start() + + await redis_client.ping() + try: + await FastAPILimiter.init(redis_client) + except Exception as exc: + print(f"redis init error: {str(exc)}") + + print("✅ Alias startup complete.") + + async def shutdown_handler(self, *args: Any, **kwargs: Any) -> None: + print("Executing Alias shutdown logic...") + await task_manager.stop() + await close_database() + print("Alias shutdown complete.") + + @staticmethod + def _extract_text_from_agent_request(req_dict: Dict[str, Any]) -> str: + agent_input = req_dict.get("input") + if isinstance(agent_input, str): + return agent_input + + if isinstance(agent_input, list) and agent_input: + last = agent_input[-1] + if isinstance(last, dict): + content = last.get("content") + if isinstance(content, str): + return content + if isinstance(content, list): + for blk in reversed(content): + if isinstance(blk, dict) and blk.get("type") == "text": + return blk.get("text") or "" + if "text" in last and isinstance(last["text"], str): + return last["text"] + return "" + + @staticmethod + def _to_uuid(val: Any) -> Optional[uuid.UUID]: + if val is None: + return None + if isinstance(val, uuid.UUID): + return val + try: + return uuid.UUID(str(val)) + except Exception: + return None + + @staticmethod + def _stable_uuid_from_string(s: str) -> uuid.UUID: + return uuid.uuid5(uuid.NAMESPACE_DNS, f"alias::{s}") + + async def _get_or_create_conversation_id( + self, + session_id: str, + user_uuid: uuid.UUID, + ) -> uuid.UUID: + if session_id in self._session_conv_cache: + return self._session_conv_cache[session_id] + + async with session_scope() as session: + service = ConversationService(session=session) + conversation = await service.create_conversation( + user_id=user_uuid, + name=self.default_conv_name, + description="created by AgentScope Runtime WebUI", + chat_mode=self.default_chat_mode, + ) + + conv_id = getattr(conversation, "id", None) + conv_id = ( + conv_id + if isinstance(conv_id, uuid.UUID) + else self._to_uuid(conv_id) + ) + if conv_id is None: + raise RuntimeError( + "ConversationService.create_conversation() " + "returned invalid id: " + f"{conversation}", + ) + + self._session_conv_cache[session_id] = conv_id + return conv_id + + async def stream_query_native( + self, + request: Union[AgentRequest, dict], + **kwargs: Any, + ) -> AsyncGenerator[Any, None]: + if not self._health: + raise RuntimeError( + "Runner has not been started. Please call " + "'await runner.start()' or use 'async with Runner()' " + "before calling 'stream_query'.", + ) + + req_dict = ( + request if isinstance(request, dict) else request.model_dump() + ) + user_id = kwargs.get("user_id") or self._to_uuid( + req_dict.get("user_id"), + ) + conversation_id = kwargs.get("conversation_id") or self._to_uuid( + req_dict.get("conversation_id"), + ) + task_id = ( + kwargs.get("task_id") + or self._to_uuid(req_dict.get("task_id")) + or uuid.uuid4() + ) + + if user_id is None or conversation_id is None: + yield { + "error": "missing_context", + "code": 422, + "message": ( + "Native mode requires user_id and conversation_id " + "in kwargs or request body." + ), + } + return + + try: + chat_request_obj = ChatRequest.model_validate(req_dict) + except ValidationError as exc: + yield { + "error": "invalid_request", + "code": 422, + "message": "ChatRequest validation failed", + "detail": exc.errors(), + } + return + except Exception as exc: + yield { + "error": "invalid_request", + "code": 500, + "message": str(exc), + } + return + + try: + result = self.query_handler( + user_id=user_id, + conversation_id=conversation_id, + task_id=task_id, + chat_request=chat_request_obj, + ) + if asyncio.iscoroutine(result): + result = await result + + async for chunk in result: + yield chunk + + except Exception as exc: + if isinstance(exc, BaseError): + yield {"error": exc.message, "code": exc.code} + else: + yield { + "error": str(exc), + "code": 500, + "error_type": exc.__class__.__name__, + } + return + + yield "[DONE]" + + async def stream_query( + self, + request: Union[AgentRequest, dict], + **kwargs: Any, + ) -> AsyncGenerator[Any, None]: + # pylint: disable=too-many-branches + # pylint: disable=too-many-statements + if not self._health: + raise RuntimeError( + "Runner has not been started. Please call " + "'await runner.start()' or use 'async with Runner()' " + "before calling 'stream_query'.", + ) + + if isinstance(request, AgentRequest): + req_dict = request.model_dump() + elif isinstance(request, dict): + req_dict = request + else: + if hasattr(request, "model_dump"): + req_dict = request.model_dump() + else: + req_dict = dict(request) + + request_id = req_dict.get("id") or str(uuid.uuid4()) + session_id = req_dict.get("session_id") or f"session_{uuid.uuid4()}" + seq_gen = SequenceNumberGenerator() + + response = AgentResponse(id=request_id) + response.session_id = session_id + yield seq_gen.yield_with_sequence(response) + + response.in_progress() + yield seq_gen.yield_with_sequence(response) + + user_text = self._extract_text_from_agent_request(req_dict) + if not user_text: + err = Error( + code="422", + message="Empty input text in AgentRequest.input.", + ) + yield seq_gen.yield_with_sequence(response.failed(err)) + return + + raw_user_id = req_dict.get("user_id") or session_id + user_uuid = self._to_uuid( + raw_user_id, + ) or self._stable_uuid_from_string( + str(raw_user_id), + ) + + conversation_id = self._to_uuid(req_dict.get("conversation_id")) + if conversation_id is None: + try: + conversation_id = await self._get_or_create_conversation_id( + session_id=session_id, + user_uuid=user_uuid, + ) + except Exception as exc: + err = Error( + code="500", + message=f"Failed to create conversation: {exc}", + ) + yield seq_gen.yield_with_sequence(response.failed(err)) + return + + task_id = self._to_uuid(req_dict.get("task_id")) or uuid.uuid4() + + try: + req_chat_mode = req_dict.get("chat_mode") or self.default_chat_mode + + chat_request_obj = ChatRequest.model_validate( + { + "query": user_text, + "chat_mode": req_chat_mode, + }, + ) + except ValidationError as exc: + err = Error( + code="422", + message=f"ChatRequest validation failed: {exc}", + ) + yield seq_gen.yield_with_sequence(response.failed(err)) + return + + try: + result = self.query_handler( + user_id=user_uuid, + conversation_id=conversation_id, + task_id=task_id, + chat_request=chat_request_obj, + ) + if asyncio.iscoroutine(result): + result = await result + + async for event in adapt_alias_message_stream(result): + try: + if ( + getattr(event, "status", None) == RunStatus.Completed + and getattr(event, "object", None) == "message" + ): + response.add_new_message(event) + except Exception: + # Best-effort bookkeeping + pass + + yield seq_gen.yield_with_sequence(event) + + except Exception as exc: + if isinstance(exc, BaseError): + err = Error(code=str(exc.code), message=exc.message) + else: + err = Error( + code="500", + message=f"Error happens in `query_handler`: {exc}", + ) + yield seq_gen.yield_with_sequence(response.failed(err)) + return + + try: + if response.output: + response.usage = response.output[-1].usage + except IndexError: + # Avoid empty message + pass + + yield seq_gen.yield_with_sequence(response.completed()) + return diff --git a/alias/src/alias/runtime/runtime_compat/runner/alias_runner_singleton.py b/alias/src/alias/runtime/runtime_compat/runner/alias_runner_singleton.py new file mode 100644 index 0000000..6089313 --- /dev/null +++ b/alias/src/alias/runtime/runtime_compat/runner/alias_runner_singleton.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +import asyncio +from typing import Optional +from alias.runtime.runtime_compat.runner.alias_runner import AliasRunner + +_lock: Optional[asyncio.Lock] = None +_runner: Optional[AliasRunner] = None + + +async def get_alias_runner() -> AliasRunner: + global _lock, _runner + + if _runner is not None: + return _runner + + if _lock is None: + _lock = asyncio.Lock() + + async with _lock: + if _runner is not None: + return _runner + runner = AliasRunner() + await runner.start() + _runner = runner + return _runner diff --git a/alias/src/alias/server/alias_agent_app.py b/alias/src/alias/server/alias_agent_app.py new file mode 100644 index 0000000..bc5c341 --- /dev/null +++ b/alias/src/alias/server/alias_agent_app.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- +from agentscope_runtime.engine.app import AgentApp + +from alias.runtime.runtime_compat.runner.alias_runner import AliasRunner + +PORT = 8090 + + +def run_app( + host: str = "127.0.0.1", + port: int = PORT, + web_ui: bool = False, + chat_mode: str = "general", +) -> None: + agent_app = AgentApp( + runner=AliasRunner( + default_chat_mode=chat_mode, + ), + app_name="Alias", + app_description=( + "An LLM-empowered agent built on AgentScope and AgentScope-Runtime" + ), + ) + agent_app.run(host=host, port=port, web_ui=web_ui) + + +def main() -> None: + import argparse + + parser = argparse.ArgumentParser(prog="alias_agent_runtime") + parser.add_argument("--host", default="127.0.0.1") + parser.add_argument("--port", type=int, default=PORT) + parser.add_argument( + "--web-ui", + action="store_true", + help="Start AgentScope Runtime WebUI (default: False)", + ) + parser.add_argument( + "--chat-mode", + default="general", + choices=["general", "dr", "browser", "ds", "finance"], + help=( + "Default chat mode used by AliasRunner when request doesn't " + "specify chat_mode." + ), + ) + args = parser.parse_args() + + print( + "[alias_agent_runtime] config:", + f"host={args.host}", + f"port={args.port}", + f"web_ui={args.web_ui}", + f"chat_mode={args.chat_mode}", + ) + + run_app( + host=args.host, + port=args.port, + web_ui=args.web_ui, + chat_mode=args.chat_mode, + ) + + +if __name__ == "__main__": + main() diff --git a/alias/src/alias/server/api/v1/__init__.py b/alias/src/alias/server/api/v1/__init__.py index 0dce93f..c8c28d9 100644 --- a/alias/src/alias/server/api/v1/__init__.py +++ b/alias/src/alias/server/api/v1/__init__.py @@ -5,7 +5,18 @@ from alias.server.api.v1.auth import router as auth_router from alias.server.api.v1.conversation import ( router as conversation_router, ) -from alias.server.api.v1.chat import router as chat_router + +# Optional backend switch: this import targets Alias's +# original FastAPI-based API router. +# Keep it for users who want to revert/switch back from +# the current AgentScope-Runtime implementation below. +# from alias.server.api.v1.chat import router as chat_router + +# Current default: +# AgentScope-Runtime-based API router (functionally equivalent +# to the FastAPI router above). +from alias.server.api.v1.chat_runtime import router as chat_router + from alias.server.api.v1.file import router as file_router from alias.server.api.v1.inner import router as inner_router from alias.server.api.v1.share import router as share_router diff --git a/alias/src/alias/server/api/v1/chat_runtime.py b/alias/src/alias/server/api/v1/chat_runtime.py new file mode 100644 index 0000000..a4e8b7e --- /dev/null +++ b/alias/src/alias/server/api/v1/chat_runtime.py @@ -0,0 +1,154 @@ +# -*- coding: utf-8 -*- +# pylint: disable=unused-argument +import json +import uuid +from typing import Any, AsyncIterator + +from fastapi import APIRouter +from fastapi.responses import StreamingResponse +from loguru import logger +from starlette.types import Receive + +from alias.server.api.deps import CurrentUser +from alias.server.exceptions.base import BaseError +from alias.server.schemas.chat import ( + ChatRequest, + StopChatPayload, + StopChatResponse, +) +from alias.server.services.chat_service import ChatService +from alias.server.utils.request_context import request_context_var +from alias.runtime.runtime_compat.runner.alias_runner import AliasRunner + +router = APIRouter(prefix="/conversations", tags=["conversations/chat"]) + + +class EnhancedStreamingResponse(StreamingResponse): + """ + StreamingResponse with client disconnect handling. + """ + + def __init__( + self, + content: Any, + user_id: uuid.UUID, + task_id: uuid.UUID, + *args: Any, + **kwargs: Any, + ) -> None: + super().__init__(content, *args, **kwargs) + self.user_id = user_id + self.task_id = task_id + + async def listen_for_disconnect(self, receive: Receive) -> None: + while True: + message = await receive() + if message["type"] == "http.disconnect": + logger.warning( + f"Chat stopped by disconnect from client: " + f"task_id={self.task_id}", + ) + service = ChatService() + await service.stop_chat( + user_id=self.user_id, + task_id=self.task_id, + ) + break + + +def _to_raw_sse_event(data: Any) -> str: + """ + Convert a chunk from runner.stream_query_native into + a raw SSE event string. + """ + if data == "[DONE]": + return "data: [DONE]\n\n" + + if hasattr(data, "model_dump"): + data = data.model_dump() + + return f"data: {json.dumps(data, ensure_ascii=False)}\n\n" + + +async def event_generator( + runner: AliasRunner, + request_dict: dict, + **runner_kwargs: Any, +) -> AsyncIterator[str]: + """ + Convert AliasRunner.stream_query_native output into + a raw SSE string stream. + """ + try: + async for chunk in runner.stream_query_native( + request_dict, + **runner_kwargs, + ): + yield _to_raw_sse_event(chunk) + except Exception as e: + if not isinstance(e, BaseError): + e = BaseError(code=500, message=str(e)) + error_data = { + "code": e.code, + "message": e.message, + } + yield _to_raw_sse_event(error_data) + yield _to_raw_sse_event("[DONE]") + + +@router.post("/{conversation_id}/chat") +async def chat( + current_user: CurrentUser, + conversation_id: uuid.UUID, + chat_request: ChatRequest, +) -> EnhancedStreamingResponse: + """Run chat via AliasRunner and stream results as SSE.""" + request_context = request_context_var.get() + request_id = request_context.request_id + task_id = uuid.UUID(request_id) if request_id else uuid.uuid4() + user_id = current_user.id + + from alias.runtime.runtime_compat.runner.alias_runner_singleton import ( + get_alias_runner, + ) + + runner = await get_alias_runner() + + request_dict = chat_request.model_dump() + + return EnhancedStreamingResponse( + event_generator( + runner, + request_dict, + user_id=user_id, + conversation_id=conversation_id, + task_id=task_id, + ), + media_type="text/event-stream", + user_id=user_id, + task_id=task_id, + ) + + +@router.post( + "/{conversation_id}/chat/{task_id}/stop", + response_model=StopChatResponse, +) +async def stop_chat( + current_user: CurrentUser, + conversation_id: uuid.UUID, + task_id: uuid.UUID, +) -> StopChatResponse: + service = ChatService() + await service.stop_chat( + user_id=current_user.id, + task_id=task_id, + ) + return StopChatResponse( + status=True, + message="Stop chat successfully.", + payload=StopChatPayload( + conversation_id=conversation_id, + task_id=task_id, + ), + )