feat(alias): add AgentScope-Runtime support (#95)

This commit is contained in:
XiuShenAl
2026-01-20 12:23:45 +08:00
committed by GitHub
parent e7f1fdf7ff
commit b843abea93
13 changed files with 991 additions and 3 deletions

View File

@@ -371,6 +371,85 @@ After the first startup, you can log in with the superuser credentials configure
- **Password**: As specified in `FIRST_SUPERUSER_PASSWORD`
### 🌐 Basic Usage -- AgentScope Runtime Deployment
Alias is now fully compatible with [AgentScope Runtime](https://github.com/agentscope-ai/agentscope-runtime/), enabling you to quickly deploy Alias as a standardized backend service. Once launched, you can easily invoke Alias capabilities via the accompanying AgentScope Runtime API.
#### 1. Prerequisites
* **Sandbox & API Keys**: Please refer to the previous sections [🐳 Sandbox Setup (Optional)](#-sandbox-setup-optional) and [🔑 API Keys Configuration](#-api-keys-configuration) to complete the basic environment setup.
* **Environment Variables**: Copy the example environment file from the project root:
```bash
cp .env.example .env
```
* **Start Redis**: Required for caching and session management:
```bash
docker run -d -p 6379:6379 --name alias-redis redis:7-alpine
```
#### 2. Installation & Sandbox Launch
Install the package in editable mode from the project root. This will automatically install the `alias_agent_runtime` CLI tool:
```bash
pip install -e .
```
To ensure proper code execution and file operations, start the sandbox server in a separate terminal:
```bash
runtime-sandbox-server --extension src/alias/runtime/alias_sandbox/alias_sandbox.py
```
#### 3. Launching AgentScope Runtime Service
You can choose to start the service via the CLI or Python code, depending on your use case.
##### Option A: Using CLI (Recommended)
Use the `alias_agent_runtime` command to launch the backend service with one click:
```bash
alias_agent_runtime --host 127.0.0.1 --port 8090 --chat-mode general
```
**Parameter Descriptions**:
* `--host` / `--port`: Specify the service address and port (default port is 8090).
* `--chat-mode`: Set the running mode. Options: `general`, `dr`, `browser`, `ds`, `finance` (default: `general`).
* `--web-ui`: (Optional) Enable AgentScope Runtime WebUI for a visual interaction interface. Skip this if you only need the API.
> **Note**: When enabling `--web-ui` for the first time, the system will automatically install necessary frontend dependencies. This may take a few minutes.
##### Option B: Using Python Code (Recommended for Developers)
If you wish to integrate or customize the launch logic within Python, you can use `AliasRunner` and `AgentApp` as shown below:
```python
from agentscope_runtime.engine.app import AgentApp
from alias.server.runtime.runner.alias_runner import AliasRunner
# 1. Initialize AliasRunner
# default_chat_mode options: "general", "dr", "browser", "ds", "finance"
runner = AliasRunner(
default_chat_mode="general",
)
# 2. Create AgentApp instance
agent_app = AgentApp(
runner=runner,
app_name="Alias",
app_description="An LLM-empowered agent built on AgentScope and AgentScope-Runtime",
)
# 3. Run the service
# Set web_ui=True to enable the visual debugging interface
agent_app.run(host="127.0.0.1", port=8090)
```
#### 4. Accessing the Application
Once the service is running, you can access Alias via:
* **Runtime API Access**: Send standard HTTP POST requests to `http://localhost:8090/process`. This is the primary method for integrating Alias into third-party frontends or backend workflows.
* **Visual Monitoring (Optional)**: If started with the `--web-ui` flag, visit `http://localhost:5173`. This interface allows developers to observe the agent's reasoning process, tool execution traces, and other debugging information.
## ⚖️ License
Alias-Agent is released under the **Apache 2.0 License** see the [LICENSE](https://github.com/agentscope-ai/agentscope-samples/blob/main/LICENSE) file for details.

View File

@@ -371,6 +371,83 @@ bash script/start_memory_service.sh
- **用户名**:如 `FIRST_SUPERUSER_USERNAME` 所指定 (默认: `alias`)
- **密码**:如 `FIRST_SUPERUSER_PASSWORD` 所指定
### 🌐 基础用法 -- AgentScope Runtime 部署
Alias 现已适配 [AgentScope Runtime](https://github.com/agentscope-ai/agentscope-runtime/),您可以利用 AgentScope Runtime 将 Alias 快速部署为标准后端服务。启动后,通过配套的 AgentScope Runtime API 即可轻松调用 Alias 所提供的服务。
#### 1. 前期准备
* **沙盒设置与 API 密钥**:请参考前文的 [🐳 沙盒设置](#-沙盒设置可选) 和 [🔑 API 密钥配置](#-api-密钥配置) 完成基础环境配置。
* **配置环境变量**:从项目根目录复制示例环境文件:
```bash
cp .env.example .env
```
* **启动 Redis**:缓存和会话管理所需:
```bash
docker run -d -p 6379:6379 --name alias-redis redis:7-alpine
```
#### 2. 安装与沙盒启动
在项目根目录下,以开发模式安装包,这将自动安装 `alias_agent_runtime` 命令行工具:
```bash
pip install -e .
```
为了确保代码执行和文件操作等功能正常,请在另一个终端启动沙盒服务器:
```bash
runtime-sandbox-server --extension src/alias/runtime/alias_sandbox/alias_sandbox.py
```
#### 3. 启动 AgentScope Runtime 服务
您可以根据使用场景,选择通过命令行或 Python 代码启动服务。
##### 选项 A使用命令行工具推荐
使用 `alias_agent_runtime` 命令一键启动后端服务:
```bash
alias_agent_runtime --host 127.0.0.1 --port 8090 --chat-mode general
```
**参数说明**
* `--host` / `--port`: 指定服务的运行地址和端口(默认端口为 8090
* `--chat-mode`: 设置运行模式,可选 `general`, `dr`, `browser`, `ds`, `finance`(默认为 `general`)。
* `--web-ui` : (可选) 启用 AgentScope Runtime WebUI 以开启可视化交互界面。若仅需调用 API请忽略此参数。
> **注意**:首次启动并开启 `--web-ui` 时,系统会自动安装必要的前端依赖包,可能需要花费几分钟时间,请耐心等待。
##### 选项 B使用代码启动开发者推荐
如果您希望在 Python 代码中集成或自定义启动逻辑,可以参考以下示例,结合 `AliasRunner` 和 `AgentApp`
```python
from agentscope_runtime.engine.app import AgentApp
from alias.server.runtime.runner.alias_runner import AliasRunner
# 1. 初始化 AliasRunner
# default_chat_mode 可选: "general", "dr", "browser", "ds", "finance"
runner = AliasRunner(
default_chat_mode="general",
)
# 2. 创建 AgentApp 实例
agent_app = AgentApp(
runner=runner,
app_name="Alias",
app_description="An LLM-empowered agent built on AgentScope and AgentScope-Runtime",
)
# 3. 运行服务
# 如需启用可视化调试界面,可设置 web_ui=True
agent_app.run(host="127.0.0.1", port=8090)
```
#### 4. 访问应用程序
服务启动后,您可以通过以下方式访问 Alias
* **Runtime API 调用**:通过标准 HTTP POST 请求访问 `http://localhost:8090/process`。这是将 Alias 集成至第三方前端或后端工作流的主要方式。
* **可视化监控 (可选)**:若启动时开启了 `--web-ui` 参数,可通过 `http://localhost:5173` 访问 WebUI。该界面主要用于开发者观察智能体的思考过程以及工具调用轨迹等调试信息。
## ⚖️ 许可证

View File

@@ -64,4 +64,5 @@ dev = [
]
[project.scripts]
alias_agent = "alias.cli:main"
alias_agent = "alias.cli:main"
alias_agent_runtime = "alias.server.alias_agent_app:main"

View File

@@ -1,7 +1,8 @@
# -*- coding: utf-8 -*-
"""Runtime module for Alias"""
__all__ = ["alias_sandbox"]
__all__ = ["alias_sandbox", "runtime_compat"]
# Import submodule to make it accessible via alias.runtime.alias_sandbox
from . import alias_sandbox # noqa: E402, F401
from . import runtime_compat # noqa: E402, F401

View File

@@ -0,0 +1,203 @@
# -*- coding: utf-8 -*-
import json
from typing import Any, AsyncIterator, Dict, Optional, Union
from agentscope_runtime.engine.helpers.agent_api_builder import ResponseBuilder
from agentscope_runtime.engine.schemas.agent_schemas import (
Content,
ContentType,
FunctionCall,
FunctionCallOutput,
Message,
MessageType,
Role,
)
def _try_deep_parse(val: Any) -> Any:
"""
Recursively parse JSON-like strings into native Python objects.
"""
if isinstance(val, str):
content = val.strip()
if (content.startswith("{") and content.endswith("}")) or (
content.startswith("[") and content.endswith("]")
):
try:
parsed = json.loads(content)
return _try_deep_parse(parsed)
except Exception:
# If nested JSON parsing fails, treat it as a normal string.
return val
return val
if isinstance(val, list):
return [_try_deep_parse(i) for i in val]
if isinstance(val, dict):
return {k: _try_deep_parse(v) for k, v in val.items()}
return val
def _ensure_safe_json_string(val: Any) -> str:
"""
Serialize content into a valid JSON string suitable for WebUI parsing.
"""
parsed_val = _try_deep_parse(val)
if parsed_val is None:
return "{}"
return json.dumps(parsed_val, ensure_ascii=False)
def _extract_alias_output_obj(content_str: str) -> Any:
"""
Extract the `output` object from Alias nested tool-result content.
"""
try:
data = json.loads(content_str)
if isinstance(data, list) and data:
return data[0].get("output")
except Exception:
# Best-effort parse: if the string is not a valid
# JSON or doesn't follow the expected structure,
# fall back to returning the original string.
pass
return content_str
class AliasAdapterState:
def __init__(
self,
message_builder: Any,
content_builder: Any,
runtime_type: str,
):
self.mb = message_builder
self.cb = content_builder
self.runtime_type = runtime_type
self.last_content = ""
self.is_completed = False
async def adapt_alias_message_stream(
source_stream: AsyncIterator[Dict[str, Any]],
) -> AsyncIterator[Union[Message, Content]]:
# pylint: disable=too-many-branches, too-many-statements
rb = ResponseBuilder()
state_map: Dict[str, AliasAdapterState] = {}
last_active_key: Optional[str] = None
yield rb.created()
yield rb.in_progress()
async for chunk in source_stream:
if not isinstance(chunk, dict) or "data" not in chunk:
continue
messages = chunk["data"].get("messages") or []
for item in messages:
alias_id = item.get("id")
inner_msg = item.get("message") or {}
alias_type = inner_msg.get("type")
alias_status = inner_msg.get("status")
tool_call_id = inner_msg.get("tool_call_id") or alias_id
if alias_type in ["thought", "sub_thought"]:
runtime_type = MessageType.REASONING
target_role = Role.ASSISTANT
elif alias_type in ["tool_call", "tool_use"]:
runtime_type = MessageType.PLUGIN_CALL
target_role = Role.ASSISTANT
elif alias_type == "tool_result":
runtime_type = MessageType.PLUGIN_CALL_OUTPUT
target_role = Role.TOOL
else:
runtime_type = MessageType.MESSAGE
target_role = Role.ASSISTANT
state_key = f"{tool_call_id}_{runtime_type}"
if last_active_key and last_active_key != state_key:
old_state = state_map.get(last_active_key)
if old_state and not old_state.is_completed:
yield old_state.cb.complete()
yield old_state.mb.complete()
old_state.is_completed = True
last_active_key = state_key
if state_key not in state_map:
mb = rb.create_message_builder(role=target_role)
mb.message.type = runtime_type
yield mb.get_message_data()
if runtime_type in [
MessageType.PLUGIN_CALL,
MessageType.PLUGIN_CALL_OUTPUT,
]:
c_type = ContentType.DATA
else:
c_type = ContentType.TEXT
cb = mb.create_content_builder(content_type=c_type)
state_map[state_key] = AliasAdapterState(mb, cb, runtime_type)
state = state_map[state_key]
if runtime_type in [MessageType.MESSAGE, MessageType.REASONING]:
raw_text = str(inner_msg.get("content") or "")
if alias_type == "files" and "files" in inner_msg:
raw_text = "\n".join(
[
f"📁 [{f['filename']}]({f['url']})"
for f in inner_msg["files"]
],
)
if raw_text.startswith(state.last_content):
delta = raw_text[len(state.last_content) :]
if delta:
yield state.cb.add_text_delta(delta)
state.last_content = raw_text
else:
yield state.cb.set_text(raw_text)
state.last_content = raw_text
elif runtime_type == MessageType.PLUGIN_CALL:
args = inner_msg.get("arguments") or {}
fc = FunctionCall(
call_id=tool_call_id,
name=inner_msg.get("tool_name") or "tool",
arguments=_ensure_safe_json_string(args),
)
yield state.cb.set_data(fc.model_dump())
elif runtime_type == MessageType.PLUGIN_CALL_OUTPUT:
output_obj = _extract_alias_output_obj(
inner_msg.get("content", ""),
)
fco = FunctionCallOutput(
call_id=tool_call_id,
name=inner_msg.get("tool_name") or "tool",
output=_ensure_safe_json_string(output_obj),
)
yield state.cb.set_data(fco.model_dump())
if alias_status == "finished" and not state.is_completed:
yield state.cb.complete()
yield state.mb.complete()
state.is_completed = True
for state in state_map.values():
if not state.is_completed:
try:
yield state.cb.complete()
yield state.mb.complete()
state.is_completed = True
except Exception:
# Graceful cleanup: ignore errors during the
# finalization phase to ensure the main response
# stream can finish without crashing.
pass
yield rb.completed()

View File

@@ -0,0 +1,371 @@
# -*- coding: utf-8 -*-
# pylint: disable=unused-argument
from __future__ import annotations
import asyncio
import uuid
from typing import Any, AsyncGenerator, Dict, Optional, Union
from fastapi_limiter import FastAPILimiter
from pydantic import ValidationError
from agentscope_runtime.engine.runner import Runner
from agentscope_runtime.engine.schemas.agent_schemas import (
AgentRequest,
AgentResponse,
Error,
RunStatus,
SequenceNumberGenerator,
)
from alias.server.db.init_db import (
close_database,
initialize_database,
session_scope,
)
from alias.server.core.task_manager import task_manager
from alias.server.exceptions.base import BaseError
from alias.runtime.runtime_compat.adapter.alias_stream_adapter import (
adapt_alias_message_stream,
)
from alias.server.schemas.chat import ChatRequest
from alias.server.services.chat_service import ChatService
from alias.server.services.conversation_service import ConversationService
from alias.server.utils.logger import setup_logger
from alias.server.utils.redis import redis_client
class AliasRunner(Runner):
FRAMEWORK_TYPE = "Alias"
def __init__(
self,
default_chat_mode: str = "general",
default_conv_name: str = "webui",
) -> None:
super().__init__()
self.framework_type = self.FRAMEWORK_TYPE
self.default_chat_mode = default_chat_mode
self.default_conv_name = default_conv_name
self._session_conv_cache: Dict[str, uuid.UUID] = {}
async def stop(self) -> None:
if not getattr(self, "_health", False):
return
await super().stop()
async def query_handler(self, *args: Any, **kwargs: Any) -> Any:
user_id: uuid.UUID = kwargs["user_id"]
conversation_id: uuid.UUID = kwargs["conversation_id"]
chat_request: ChatRequest = kwargs["chat_request"]
task_id: uuid.UUID = kwargs.get("task_id") or uuid.uuid4()
service = ChatService()
response_gen = await service.chat(
user_id=user_id,
conversation_id=conversation_id,
chat_request=chat_request,
task_id=task_id,
)
return response_gen
async def init_handler(self, *args: Any, **kwargs: Any) -> None:
print("🚀 Starting Alias API Server...")
setup_logger()
await initialize_database()
await task_manager.start()
await redis_client.ping()
try:
await FastAPILimiter.init(redis_client)
except Exception as exc:
print(f"redis init error: {str(exc)}")
print("✅ Alias startup complete.")
async def shutdown_handler(self, *args: Any, **kwargs: Any) -> None:
print("Executing Alias shutdown logic...")
await task_manager.stop()
await close_database()
print("Alias shutdown complete.")
@staticmethod
def _extract_text_from_agent_request(req_dict: Dict[str, Any]) -> str:
agent_input = req_dict.get("input")
if isinstance(agent_input, str):
return agent_input
if isinstance(agent_input, list) and agent_input:
last = agent_input[-1]
if isinstance(last, dict):
content = last.get("content")
if isinstance(content, str):
return content
if isinstance(content, list):
for blk in reversed(content):
if isinstance(blk, dict) and blk.get("type") == "text":
return blk.get("text") or ""
if "text" in last and isinstance(last["text"], str):
return last["text"]
return ""
@staticmethod
def _to_uuid(val: Any) -> Optional[uuid.UUID]:
if val is None:
return None
if isinstance(val, uuid.UUID):
return val
try:
return uuid.UUID(str(val))
except Exception:
return None
@staticmethod
def _stable_uuid_from_string(s: str) -> uuid.UUID:
return uuid.uuid5(uuid.NAMESPACE_DNS, f"alias::{s}")
async def _get_or_create_conversation_id(
self,
session_id: str,
user_uuid: uuid.UUID,
) -> uuid.UUID:
if session_id in self._session_conv_cache:
return self._session_conv_cache[session_id]
async with session_scope() as session:
service = ConversationService(session=session)
conversation = await service.create_conversation(
user_id=user_uuid,
name=self.default_conv_name,
description="created by AgentScope Runtime WebUI",
chat_mode=self.default_chat_mode,
)
conv_id = getattr(conversation, "id", None)
conv_id = (
conv_id
if isinstance(conv_id, uuid.UUID)
else self._to_uuid(conv_id)
)
if conv_id is None:
raise RuntimeError(
"ConversationService.create_conversation() "
"returned invalid id: "
f"{conversation}",
)
self._session_conv_cache[session_id] = conv_id
return conv_id
async def stream_query_native(
self,
request: Union[AgentRequest, dict],
**kwargs: Any,
) -> AsyncGenerator[Any, None]:
if not self._health:
raise RuntimeError(
"Runner has not been started. Please call "
"'await runner.start()' or use 'async with Runner()' "
"before calling 'stream_query'.",
)
req_dict = (
request if isinstance(request, dict) else request.model_dump()
)
user_id = kwargs.get("user_id") or self._to_uuid(
req_dict.get("user_id"),
)
conversation_id = kwargs.get("conversation_id") or self._to_uuid(
req_dict.get("conversation_id"),
)
task_id = (
kwargs.get("task_id")
or self._to_uuid(req_dict.get("task_id"))
or uuid.uuid4()
)
if user_id is None or conversation_id is None:
yield {
"error": "missing_context",
"code": 422,
"message": (
"Native mode requires user_id and conversation_id "
"in kwargs or request body."
),
}
return
try:
chat_request_obj = ChatRequest.model_validate(req_dict)
except ValidationError as exc:
yield {
"error": "invalid_request",
"code": 422,
"message": "ChatRequest validation failed",
"detail": exc.errors(),
}
return
except Exception as exc:
yield {
"error": "invalid_request",
"code": 500,
"message": str(exc),
}
return
try:
result = self.query_handler(
user_id=user_id,
conversation_id=conversation_id,
task_id=task_id,
chat_request=chat_request_obj,
)
if asyncio.iscoroutine(result):
result = await result
async for chunk in result:
yield chunk
except Exception as exc:
if isinstance(exc, BaseError):
yield {"error": exc.message, "code": exc.code}
else:
yield {
"error": str(exc),
"code": 500,
"error_type": exc.__class__.__name__,
}
return
yield "[DONE]"
async def stream_query(
self,
request: Union[AgentRequest, dict],
**kwargs: Any,
) -> AsyncGenerator[Any, None]:
# pylint: disable=too-many-branches
# pylint: disable=too-many-statements
if not self._health:
raise RuntimeError(
"Runner has not been started. Please call "
"'await runner.start()' or use 'async with Runner()' "
"before calling 'stream_query'.",
)
if isinstance(request, AgentRequest):
req_dict = request.model_dump()
elif isinstance(request, dict):
req_dict = request
else:
if hasattr(request, "model_dump"):
req_dict = request.model_dump()
else:
req_dict = dict(request)
request_id = req_dict.get("id") or str(uuid.uuid4())
session_id = req_dict.get("session_id") or f"session_{uuid.uuid4()}"
seq_gen = SequenceNumberGenerator()
response = AgentResponse(id=request_id)
response.session_id = session_id
yield seq_gen.yield_with_sequence(response)
response.in_progress()
yield seq_gen.yield_with_sequence(response)
user_text = self._extract_text_from_agent_request(req_dict)
if not user_text:
err = Error(
code="422",
message="Empty input text in AgentRequest.input.",
)
yield seq_gen.yield_with_sequence(response.failed(err))
return
raw_user_id = req_dict.get("user_id") or session_id
user_uuid = self._to_uuid(
raw_user_id,
) or self._stable_uuid_from_string(
str(raw_user_id),
)
conversation_id = self._to_uuid(req_dict.get("conversation_id"))
if conversation_id is None:
try:
conversation_id = await self._get_or_create_conversation_id(
session_id=session_id,
user_uuid=user_uuid,
)
except Exception as exc:
err = Error(
code="500",
message=f"Failed to create conversation: {exc}",
)
yield seq_gen.yield_with_sequence(response.failed(err))
return
task_id = self._to_uuid(req_dict.get("task_id")) or uuid.uuid4()
try:
req_chat_mode = req_dict.get("chat_mode") or self.default_chat_mode
chat_request_obj = ChatRequest.model_validate(
{
"query": user_text,
"chat_mode": req_chat_mode,
},
)
except ValidationError as exc:
err = Error(
code="422",
message=f"ChatRequest validation failed: {exc}",
)
yield seq_gen.yield_with_sequence(response.failed(err))
return
try:
result = self.query_handler(
user_id=user_uuid,
conversation_id=conversation_id,
task_id=task_id,
chat_request=chat_request_obj,
)
if asyncio.iscoroutine(result):
result = await result
async for event in adapt_alias_message_stream(result):
try:
if (
getattr(event, "status", None) == RunStatus.Completed
and getattr(event, "object", None) == "message"
):
response.add_new_message(event)
except Exception:
# Best-effort bookkeeping
pass
yield seq_gen.yield_with_sequence(event)
except Exception as exc:
if isinstance(exc, BaseError):
err = Error(code=str(exc.code), message=exc.message)
else:
err = Error(
code="500",
message=f"Error happens in `query_handler`: {exc}",
)
yield seq_gen.yield_with_sequence(response.failed(err))
return
try:
if response.output:
response.usage = response.output[-1].usage
except IndexError:
# Avoid empty message
pass
yield seq_gen.yield_with_sequence(response.completed())
return

View File

@@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
import asyncio
from typing import Optional
from alias.runtime.runtime_compat.runner.alias_runner import AliasRunner
_lock: Optional[asyncio.Lock] = None
_runner: Optional[AliasRunner] = None
async def get_alias_runner() -> AliasRunner:
global _lock, _runner
if _runner is not None:
return _runner
if _lock is None:
_lock = asyncio.Lock()
async with _lock:
if _runner is not None:
return _runner
runner = AliasRunner()
await runner.start()
_runner = runner
return _runner

View File

@@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
from agentscope_runtime.engine.app import AgentApp
from alias.runtime.runtime_compat.runner.alias_runner import AliasRunner
PORT = 8090
def run_app(
host: str = "127.0.0.1",
port: int = PORT,
web_ui: bool = False,
chat_mode: str = "general",
) -> None:
agent_app = AgentApp(
runner=AliasRunner(
default_chat_mode=chat_mode,
),
app_name="Alias",
app_description=(
"An LLM-empowered agent built on AgentScope and AgentScope-Runtime"
),
)
agent_app.run(host=host, port=port, web_ui=web_ui)
def main() -> None:
import argparse
parser = argparse.ArgumentParser(prog="alias_agent_runtime")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=PORT)
parser.add_argument(
"--web-ui",
action="store_true",
help="Start AgentScope Runtime WebUI (default: False)",
)
parser.add_argument(
"--chat-mode",
default="general",
choices=["general", "dr", "browser", "ds", "finance"],
help=(
"Default chat mode used by AliasRunner when request doesn't "
"specify chat_mode."
),
)
args = parser.parse_args()
print(
"[alias_agent_runtime] config:",
f"host={args.host}",
f"port={args.port}",
f"web_ui={args.web_ui}",
f"chat_mode={args.chat_mode}",
)
run_app(
host=args.host,
port=args.port,
web_ui=args.web_ui,
chat_mode=args.chat_mode,
)
if __name__ == "__main__":
main()

View File

@@ -5,7 +5,18 @@ from alias.server.api.v1.auth import router as auth_router
from alias.server.api.v1.conversation import (
router as conversation_router,
)
from alias.server.api.v1.chat import router as chat_router
# Optional backend switch: this import targets Alias's
# original FastAPI-based API router.
# Keep it for users who want to revert/switch back from
# the current AgentScope-Runtime implementation below.
# from alias.server.api.v1.chat import router as chat_router
# Current default:
# AgentScope-Runtime-based API router (functionally equivalent
# to the FastAPI router above).
from alias.server.api.v1.chat_runtime import router as chat_router
from alias.server.api.v1.file import router as file_router
from alias.server.api.v1.inner import router as inner_router
from alias.server.api.v1.share import router as share_router

View File

@@ -0,0 +1,154 @@
# -*- coding: utf-8 -*-
# pylint: disable=unused-argument
import json
import uuid
from typing import Any, AsyncIterator
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
from loguru import logger
from starlette.types import Receive
from alias.server.api.deps import CurrentUser
from alias.server.exceptions.base import BaseError
from alias.server.schemas.chat import (
ChatRequest,
StopChatPayload,
StopChatResponse,
)
from alias.server.services.chat_service import ChatService
from alias.server.utils.request_context import request_context_var
from alias.runtime.runtime_compat.runner.alias_runner import AliasRunner
router = APIRouter(prefix="/conversations", tags=["conversations/chat"])
class EnhancedStreamingResponse(StreamingResponse):
"""
StreamingResponse with client disconnect handling.
"""
def __init__(
self,
content: Any,
user_id: uuid.UUID,
task_id: uuid.UUID,
*args: Any,
**kwargs: Any,
) -> None:
super().__init__(content, *args, **kwargs)
self.user_id = user_id
self.task_id = task_id
async def listen_for_disconnect(self, receive: Receive) -> None:
while True:
message = await receive()
if message["type"] == "http.disconnect":
logger.warning(
f"Chat stopped by disconnect from client: "
f"task_id={self.task_id}",
)
service = ChatService()
await service.stop_chat(
user_id=self.user_id,
task_id=self.task_id,
)
break
def _to_raw_sse_event(data: Any) -> str:
"""
Convert a chunk from runner.stream_query_native into
a raw SSE event string.
"""
if data == "[DONE]":
return "data: [DONE]\n\n"
if hasattr(data, "model_dump"):
data = data.model_dump()
return f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
async def event_generator(
runner: AliasRunner,
request_dict: dict,
**runner_kwargs: Any,
) -> AsyncIterator[str]:
"""
Convert AliasRunner.stream_query_native output into
a raw SSE string stream.
"""
try:
async for chunk in runner.stream_query_native(
request_dict,
**runner_kwargs,
):
yield _to_raw_sse_event(chunk)
except Exception as e:
if not isinstance(e, BaseError):
e = BaseError(code=500, message=str(e))
error_data = {
"code": e.code,
"message": e.message,
}
yield _to_raw_sse_event(error_data)
yield _to_raw_sse_event("[DONE]")
@router.post("/{conversation_id}/chat")
async def chat(
current_user: CurrentUser,
conversation_id: uuid.UUID,
chat_request: ChatRequest,
) -> EnhancedStreamingResponse:
"""Run chat via AliasRunner and stream results as SSE."""
request_context = request_context_var.get()
request_id = request_context.request_id
task_id = uuid.UUID(request_id) if request_id else uuid.uuid4()
user_id = current_user.id
from alias.runtime.runtime_compat.runner.alias_runner_singleton import (
get_alias_runner,
)
runner = await get_alias_runner()
request_dict = chat_request.model_dump()
return EnhancedStreamingResponse(
event_generator(
runner,
request_dict,
user_id=user_id,
conversation_id=conversation_id,
task_id=task_id,
),
media_type="text/event-stream",
user_id=user_id,
task_id=task_id,
)
@router.post(
"/{conversation_id}/chat/{task_id}/stop",
response_model=StopChatResponse,
)
async def stop_chat(
current_user: CurrentUser,
conversation_id: uuid.UUID,
task_id: uuid.UUID,
) -> StopChatResponse:
service = ChatService()
await service.stop_chat(
user_id=current_user.id,
task_id=task_id,
)
return StopChatResponse(
status=True,
message="Stop chat successfully.",
payload=StopChatPayload(
conversation_id=conversation_id,
task_id=task_id,
),
)