diff --git a/alias/.env.example b/alias/.env.example index 2a8535a..3edaf61 100644 --- a/alias/.env.example +++ b/alias/.env.example @@ -120,5 +120,5 @@ OSS_BUCKET_NAME= # Agent Execution Settings HEARTBEAT_INTERVAL=10 -MAX_CHAT_EXECUTION_TIME=3600 # 1 hour +MAX_CHAT_EXECUTION_TIME=3600 ENABLE_BACKGROUND_CHAT=true diff --git a/alias/README.md b/alias/README.md index 9079a3e..c07797d 100644 --- a/alias/README.md +++ b/alias/README.md @@ -19,6 +19,7 @@ +[[中文README]](README_ZH.md) *Alias-Agent* (short for *Alias*) is an LLM-empowered agent built on [AgentScope](https://github.com/agentscope-ai/agentscope) and [AgentScope-runtime](https://github.com/agentscope-ai/agentscope-runtime/), designed to serve as a general-purpose intelligent assistant for responding to user queries. Alias excels at decomposing complicated problems, constructing roadmaps, and applying appropriate strategies to tackle diverse real-world tasks. @@ -154,7 +155,7 @@ docker pull agentscope-registry.ap-southeast-1.cr.aliyuncs.com/agentscope/runtim docker pull agentscope/runtime-sandbox-alias:latest ``` -More details can refer to [AgentScope Runtime documentation](https://runtime.agentscope.io/en/sandbox.html). +More details can refer to [AgentScope Runtime documentation](https://runtime.agentscope.io/en/sandbox/sandbox.html). ### 🔑 API Keys Configuration @@ -205,6 +206,19 @@ alias_agent run --mode ds \ **Note**: Files uploaded with `--files` are automatically copied to `/workspace` in the sandbox. Generated files are available in `sessions_mount_dir` subdirectories. +#### Enable Long-Term Memory Service (General Mode Only) +To enable the long-term memory service in General mode, you need to: +1. **Start the Memory Service first** (see [Start the Memory Service Server](#start-the-memory-service-server) section below) +2. **Use the `--use_long_term_memory` flag** when running in General mode: +```bash +# General mode with long-term memory service enabled +alias_agent run --mode general --task "Analyze Meta stock performance in Q1 2025" --use_long_term_memory +``` +**Important**: +- Long-term memory is only enabled when the `--use_long_term_memory` flag is explicitly provided (disabled by default) +- The long-term memory service is only available in **General mode** (meta-planner) +- The memory service must be running before starting the agent +- When enabled, the agent will retrieve user profiling information at session start to provide personalized experiences ### Basic Usage -- Full-Stack Deployment @@ -285,6 +299,8 @@ The frontend will start on `http://localhost:5173` (or the port specified in `vi #### Start the Memory Service Server +> **Note**: The Memory Service is required if you want to enable long-term memory features in General mode. Make sure to start the Memory Service before using the `--use_long_term_memory` flag in CLI or setting `use_long_term_memory_service: true` in API requests. + First install the Memory Service package in development mode ```bash @@ -331,7 +347,7 @@ The script will automatically check and start Redis and Qdrant services (via Doc **Option 2: Docker Deployment** -For Docker-based deployment, please refer to the detailed documentation at [alias/memory_service/docker/README.md](memory_service/docker/README.md). +For Docker-based deployment, please refer to the detailed documentation at [Detailed Docs](src/alias/memory_service/docker/README.md). #### Access the Application diff --git a/alias/README_ZH.md b/alias/README_ZH.md index 3fd3e65..63869b2 100644 --- a/alias/README_ZH.md +++ b/alias/README_ZH.md @@ -19,6 +19,8 @@ +[[English README]](README.md) + *Alias-Agent*(简称 *Alias*)是一个基于 [AgentScope](https://github.com/agentscope-ai/agentscope) 和 [AgentScope-runtime](https://github.com/agentscope-ai/agentscope-runtime/) 构建的、由大语言模型驱动的智能体,旨在作为通用智能助手响应用户查询。Alias 擅长分解复杂问题、构建解决路径,并应用合适的策略来处理多样化的现实世界任务。 @@ -154,7 +156,7 @@ docker pull agentscope-registry.ap-southeast-1.cr.aliyuncs.com/agentscope/runtim docker pull agentscope/runtime-sandbox-alias:latest ``` -更多详情请参考 [AgentScope Runtime 文档](https://runtime.agentscope.io/en/sandbox.html)。 +更多详情请参考 [AgentScope Runtime 文档](https://runtime.agentscope.io/zh/sandbox/sandbox.html)。 ### 🔑 API 密钥配置 @@ -205,6 +207,19 @@ alias_agent run --mode ds \ **注意**:使用 `--files` 上传的文件会自动复制到沙盒中的 `/workspace`。生成的文件可在 `sessions_mount_dir` 的子目录中找到。 +#### 启用长期记忆服务(仅限通用模式) +要在通用模式下启用长期记忆服务,您需要: +1. **首先启动记忆服务**(请参阅下面的[启动记忆服务服务器](#启动记忆服务服务器)部分) +2. **在通用模式下运行时使用 `--use_long_term_memory` 标志**: +```bash +# 启用长期记忆服务的通用模式 +alias_agent run --mode general --task "Analyze Meta stock performance in Q1 2025" --use_long_term_memory +``` +**重要提示**: +- 只有显式添加 `--use_long_term_memory` 标志时才会启用长期记忆(默认禁用) +- 长期记忆服务仅在**通用模式**(元规划器)中可用 +- 在启动智能体之前,记忆服务必须正在运行 +- 启用后,智能体将在会话开始时检索用户画像信息,以提供个性化体验 ### 基础用法 -- 全栈部署 @@ -285,6 +300,8 @@ npm run dev #### 启动记忆服务服务器 +> **注意**:如果您想在通用模式下启用长期记忆功能,则需要记忆服务。在使用 CLI 中的 `--use_long_term_memory` 标志或在 API 请求中设置 `use_long_term_memory_service: true` 之前,请确保已启动记忆服务。 + 首先,以开发模式安装 Memory Service 包 ```bash @@ -331,7 +348,7 @@ bash script/start_memory_service.sh **选项 2:Docker 部署** -有关基于 Docker 的部署,请参阅 [alias/memory_service/docker/README.md](memory_service/docker/README.md) 中的详细文档。 +有关基于 Docker 的部署,请参阅[详细文档](src/alias/memory_service/docker/README.md)。 #### 访问应用程序 diff --git a/alias/frontend/src/components/AgentscopeLogoIcon/index.tsx b/alias/frontend/src/components/AgentscopeLogoIcon/index.tsx new file mode 100644 index 0000000..8715c25 --- /dev/null +++ b/alias/frontend/src/components/AgentscopeLogoIcon/index.tsx @@ -0,0 +1,63 @@ +import type { GetProps } from "antd"; +import Icon from "@ant-design/icons"; + +type CustomIconComponentProps = GetProps; +const AgentscopeLogoIconSvg = () => ( + + + + + + + + + + + + + + + + +); +const AgentscopeLogoIcon = (props: Partial) => ( + +); + +export default AgentscopeLogoIcon; diff --git a/alias/frontend/src/components/AliasLogoIcon/index.tsx b/alias/frontend/src/components/AliasLogoIcon/index.tsx new file mode 100644 index 0000000..dab7338 --- /dev/null +++ b/alias/frontend/src/components/AliasLogoIcon/index.tsx @@ -0,0 +1,46 @@ +import Icon from "@ant-design/icons"; +import type { GetProps } from "antd"; + +type CustomIconComponentProps = GetProps; +const AliasLogoIconSvg = () => ( + + + + + + + + + + + + + + + +); +const AliasLogoIcon = (props: Partial) => ( + +); + +export default AliasLogoIcon; diff --git a/alias/frontend/src/pages/Chat/WelcomeView/index.module.scss b/alias/frontend/src/pages/Chat/WelcomeView/index.module.scss index 6999a25..d4352fe 100644 --- a/alias/frontend/src/pages/Chat/WelcomeView/index.module.scss +++ b/alias/frontend/src/pages/Chat/WelcomeView/index.module.scss @@ -5,7 +5,6 @@ justify-content: center; margin-bottom: 20px; .button { - width: 180px; border-radius: 15px; } } @@ -13,6 +12,7 @@ display: flex; justify-content: center; font-size: 32px; + width: 1000px; font-weight: 500; letter-spacing: normal; color: var(--sps-color-text); @@ -21,7 +21,6 @@ .logo { height: 72px; width: 80px; - margin: 0 12px; margin-top: -12px; } .label { diff --git a/alias/frontend/src/pages/Chat/WelcomeView/index.tsx b/alias/frontend/src/pages/Chat/WelcomeView/index.tsx index c022757..67ce0c4 100644 --- a/alias/frontend/src/pages/Chat/WelcomeView/index.tsx +++ b/alias/frontend/src/pages/Chat/WelcomeView/index.tsx @@ -1,8 +1,10 @@ -import React, { memo } from "react"; -import { Button, Flex } from "antd"; +import AgentscopeLogoIcon from "@/components/AgentscopeLogoIcon"; +import AliasLogoIcon from "@/components/AliasLogoIcon"; +import LogoIcon from "@/components/LogoIcon"; import { Welcome } from "@agentscope-ai/chat"; import { SparkUpperrightArrowLine } from "@agentscope-ai/icons"; -import LogoIcon from "@/components/LogoIcon"; +import { Button, Flex } from "antd"; +import React, { memo } from "react"; import styles from "./index.module.scss"; const WelcomeView: React.FC = ({}) => { const goGitHub = (url: string) => { @@ -18,7 +20,8 @@ const WelcomeView: React.FC = ({}) => { goGitHub("https://github.com/agentscope-ai/agentscope"); }} > - AgentScope Github + + AgentScope GitHub @@ -39,11 +43,12 @@ const WelcomeView: React.FC = ({}) => { logo={null} title={
-
Tell
-
what you want to do
+
+ : Start It Now, Extend It Your Way, Deploy All with Ease +
} desc={ diff --git a/alias/script/start_memory_service.sh b/alias/script/start_memory_service.sh index de71e1f..f2f9310 100644 --- a/alias/script/start_memory_service.sh +++ b/alias/script/start_memory_service.sh @@ -86,31 +86,50 @@ QDRANT_CONTAINER_NAME="user-profiling-qdrant" check_port() { local host=$1 local port=$2 - timeout 1 bash -c "cat < /dev/null > /dev/tcp/$host/$port" 2>/dev/null + # Try using nc (netcat) first, which is more reliable and cross-platform + if command -v nc &> /dev/null; then + if nc -z "$host" "$port" 2>/dev/null; then + return 0 + fi + fi + # Fallback to bash TCP check (works on Linux and macOS) + if bash -c "exec 3<>/dev/tcp/$host/$port" 2>/dev/null; then + exec 3<&- + exec 3>&- + return 0 + fi + return 1 } # Function to check if Redis is running check_redis() { - if check_port "$REDIS_HOST" "$REDIS_PORT"; then - # Try to ping Redis - if command -v redis-cli &> /dev/null; then - if redis-cli -h "$REDIS_HOST" -p "$REDIS_PORT" ping &> /dev/null; then - return 0 - fi - else - # If redis-cli is not available, just check if port is open + # First try to ping Redis directly (most reliable method) + if command -v redis-cli &> /dev/null; then + if redis-cli -h "$REDIS_HOST" -p "$REDIS_PORT" ping &> /dev/null; then return 0 fi fi + # Fallback to port check if redis-cli is not available + if check_port "$REDIS_HOST" "$REDIS_PORT"; then + return 0 + fi return 1 } # Function to check if Qdrant is running check_qdrant() { - # First check if port is open + # First check if any Qdrant container is running (most reliable) + if docker ps --format '{{.Names}}' | grep -q "qdrant"; then + # Check if the port is accessible + if check_port "$QDRANT_HOST" "$QDRANT_PORT"; then + return 0 + fi + fi + + # Check if port is open if check_port "$QDRANT_HOST" "$QDRANT_PORT"; then # Try to check Qdrant health endpoint - if curl -s -f "http://$QDRANT_HOST:$QDRANT_PORT/health" &> /dev/null; then + if curl -s -f "http://$QDRANT_HOST:$QDRANT_PORT/health" &> /dev/null 2>&1; then return 0 fi # If port is open but health check fails, still consider it running @@ -187,11 +206,30 @@ start_qdrant_docker() { fi # Check if any container is using this port if docker ps --format '{{.Names}} {{.Ports}}' | grep -q ":$QDRANT_PORT"; then - print_warn "Port $QDRANT_PORT is in use by another container. Assuming Qdrant is running." + # Check if it's a Qdrant container + qdrant_container=$(docker ps --format '{{.Names}} {{.Ports}}' | grep ":$QDRANT_PORT" | grep -i qdrant | head -1 | awk '{print $1}') + if [ -n "$qdrant_container" ]; then + print_info "Port $QDRANT_PORT is in use by Qdrant container '$qdrant_container'. Using existing service." + return 0 + fi + # Verify it's actually a Qdrant service by checking health endpoint + if curl -s -f "http://$QDRANT_HOST:$QDRANT_PORT/health" &> /dev/null 2>&1; then + print_info "Port $QDRANT_PORT is in use by another Qdrant container. Using existing service." + return 0 + else + # Port is open, assume it's Qdrant even if health check fails + print_info "Port $QDRANT_PORT is in use. Assuming Qdrant service is running." + return 0 + fi + fi + # Port is in use but not by a container - verify it's Qdrant + if curl -s -f "http://$QDRANT_HOST:$QDRANT_PORT/health" &> /dev/null 2>&1; then + print_info "Port $QDRANT_PORT is in use by a Qdrant service. Using existing service." return 0 fi - print_warn "Port $QDRANT_PORT is in use but not by our container. Please check manually." - return 1 + # Port is open, assume it's Qdrant + print_info "Port $QDRANT_PORT is in use. Assuming Qdrant service is running." + return 0 fi # Create storage directory if it doesn't exist @@ -206,6 +244,18 @@ start_qdrant_docker() { return 0 else print_info "Starting existing Qdrant container..." + # Check if the port is already in use before starting + if check_port "$QDRANT_HOST" "$QDRANT_PORT"; then + # Check if any Qdrant container is using this port + qdrant_container=$(docker ps --format '{{.Names}} {{.Ports}}' | grep ":$QDRANT_PORT" | grep -i qdrant | head -1 | awk '{print $1}') + if [ -n "$qdrant_container" ]; then + print_info "Port $QDRANT_PORT is already in use by Qdrant container '$qdrant_container'. Skipping container start." + return 0 + fi + # Port is in use, assume it's Qdrant (even if health check fails) + print_info "Port $QDRANT_PORT is already in use. Assuming Qdrant service is running. Skipping container start." + return 0 + fi docker start "$QDRANT_CONTAINER_NAME" fi else diff --git a/alias/src/alias/agent/agents/_alias_agent_base.py b/alias/src/alias/agent/agents/_alias_agent_base.py index f28d06a..bfb32fd 100644 --- a/alias/src/alias/agent/agents/_alias_agent_base.py +++ b/alias/src/alias/agent/agents/_alias_agent_base.py @@ -3,14 +3,14 @@ import asyncio import json import time import traceback -from typing import Any, Optional +from typing import Any, Optional, Literal from loguru import logger from agentscope.agent import ReActAgent from agentscope.model import ChatModelBase from agentscope.formatter import FormatterBase -from agentscope.memory import MemoryBase +from agentscope.memory import MemoryBase, LongTermMemoryBase from agentscope.message import Msg, TextBlock, ToolUseBlock, ToolResultBlock from alias.agent.tools import AliasToolkit @@ -54,6 +54,12 @@ class AliasAgentBase(ReActAgent): sys_prompt: Optional[str] = None, max_iters: int = 10, tool_call_interrupt_return: bool = True, + long_term_memory: Optional[LongTermMemoryBase] = None, + long_term_memory_mode: Literal[ + "agent_control", + "static_control", + "both", + ] = "both", ): super().__init__( name=name, @@ -63,6 +69,8 @@ class AliasAgentBase(ReActAgent): memory=memory, toolkit=toolkit, max_iters=max_iters, + long_term_memory=long_term_memory, + long_term_memory_mode=long_term_memory_mode, ) self.session_service = session_service @@ -256,3 +264,45 @@ class AliasAgentBase(ReActAgent): Add additional interrupt function name to the agent. """ self.agent_stop_function_names.append(func_name) + + async def _retrieve_from_long_term_memory( + self, + msg: Msg | list[Msg] | None, # pylint: disable=unused-argument + ) -> None: + """Override the parent method to retrieve from long-term memory using + the last user message in memory if available. + Args: + msg (`Msg | list[Msg] | None`): + The input message to the agent (may be None). + """ + if self._static_control and self.long_term_memory: + # Get messages from memory + memory_msgs = await self.memory.get_memory() + + # Check if there are messages and the last one is from user + if memory_msgs and len(memory_msgs) > 0: + last_msg = memory_msgs[-1] + if last_msg.role == "user": + # Check if the user message is just "continue" + user_content = str(last_msg.content).strip().lower() + if user_content == "continue": + logger.info( + "User input is 'continue' message, " + "skipping retrieve from long-term memory", + ) + retrieved_info = None + else: + # Retrieve using the last user message + retrieved_info = await self.long_term_memory.retrieve( + last_msg, + ) + if retrieved_info: + retrieved_msg = Msg( + name="long_term_memory", + content="The content below are " + "retrieved from long-term memory, which may be " + "related to user preference and may be useful:\n" + f"{retrieved_info}", + role="user", + ) + await self.memory.add(retrieved_msg) diff --git a/alias/src/alias/agent/agents/_browser_agent.py b/alias/src/alias/agent/agents/_browser_agent.py index d43536c..3463f4a 100644 --- a/alias/src/alias/agent/agents/_browser_agent.py +++ b/alias/src/alias/agent/agents/_browser_agent.py @@ -133,6 +133,9 @@ async def browser_post_acting_hook( Hook func for cleaning the messy return after action. Observation will be done before reasoning steps. """ + tool_call = kwargs.get("tool_call") + if tool_call is None: + return mem_msgs = await self.memory.get_memory() mem_length = await self.memory.size() if len(mem_msgs) == 0: @@ -145,7 +148,11 @@ async def browser_post_acting_hook( tool_res_msg.content[i]["output"][j][ "text" ] = self._filter_execution_text(return_json["text"]) - await self.print(tool_res_msg) + if tool_call["name"] != self.finish_function_name or ( + tool_call["name"] == self.finish_function_name + and not tool_res_msg.metadata.get("success") + ): + await self.print(tool_res_msg) await self.memory.delete(mem_length - 1) await self.memory.add(tool_res_msg) @@ -252,12 +259,7 @@ class BrowserAgent(AliasAgentBase): ) self.toolkit.register_tool_function(self.browser_subtask_manager) - if ( - self.model.model_name.startswith("qvq") - or "-vl" in self.model.model_name - or "4o" in self.model.model_name - or "gpt-5" in self.model.model_name - ): + if self._supports_multimodal(): self._register_skill_tool(image_understanding) self._register_skill_tool(video_understanding) @@ -328,6 +330,19 @@ class BrowserAgent(AliasAgentBase): pass self.toolkit.register_tool_function(tool) + def _supports_multimodal(self) -> bool: + """Check if the model supports multimodal input (images/videos). + + Returns: + bool: True if the model supports multimodal input, False otherwise. + """ + return ( + self.model.model_name.startswith("qvq") + or "-vl" in self.model.model_name + or "4o" in self.model.model_name + or "gpt-5" in self.model.model_name + ) + async def reply( self, msg: Msg | list[Msg] | None = None, @@ -396,7 +411,7 @@ class BrowserAgent(AliasAgentBase): break # When the maximum iterations are reached if not reply_msg: - await self._summarizing() + reply_msg = await self._summarizing() await self.memory.add(reply_msg) return reply_msg @@ -566,12 +581,7 @@ class BrowserAgent(AliasAgentBase): ) -> Msg: """Get a snapshot in text before reasoning""" image_data: Optional[str] = None - if ( - self.model.model_name.startswith("qvq") - or "-vl" in self.model.model_name - or "4o" in self.model.model_name - or "gpt-5" in self.model.model_name - ): + if self._supports_multimodal(): # If the model supports multimodal input, take a screenshot # and pass it to the observation message as base64 image_data = await self._get_screenshot() @@ -599,7 +609,9 @@ class BrowserAgent(AliasAgentBase): ).replace("```", "") data = json.loads(raw_response) information = data.get("INFORMATION", "") - self.chunk_continue_status = data.get("STATUS", "CONTINUE") + self.chunk_continue_status = ( + data.get("STATUS") != "REASONING_FINISHED" + ) except Exception: information = raw_response if ( @@ -628,24 +640,6 @@ class BrowserAgent(AliasAgentBase): if b["type"] == "tool_use": self.chunk_continue_status = False - def _clean_tool_excution_content( - self, - output_msg: Msg, - ) -> Msg: - """ - Hook func for cleaning the messy return after action. - Observation will be done before reasoning steps. - """ - - for i, b in enumerate(output_msg.content): - if b["type"] == "tool_result": - for j, return_json in enumerate(b.get("output", [])): - if isinstance(return_json, dict) and "text" in return_json: - output_msg.content[i]["output"][j][ - "text" - ] = self._filter_execution_text(return_json["text"]) - return output_msg - async def _task_decomposition_and_reformat( # pylint: disable=too-many-statements self, original_task: Msg | list[Msg] | None, @@ -753,7 +747,7 @@ class BrowserAgent(AliasAgentBase): try: formatted_task += ( "The decomposed subtasks are: " - + json.dumps(self.subtasks) + + json.dumps(self.subtasks, ensure_ascii=False) + "\n" ) formatted_task += ( @@ -802,9 +796,8 @@ class BrowserAgent(AliasAgentBase): input={"action": "close", "index": 0}, type="tool_use", ) - response = await self.toolkit.call_tool_function(tool_call) - async for chunk in response: - response_text = chunk.content + await self.toolkit.call_tool_function(tool_call) + tool_call = ToolUseBlock( id=str(uuid.uuid4()), type="tool_use", @@ -883,8 +876,8 @@ class BrowserAgent(AliasAgentBase): "1. What has been completed so far.\n" "2. What key information has been found.\n" "3. What remains to be done.\n" - "Ensure that your summary is clear, concise, and t" - "hat no tasks are repeated or skipped." + "Ensure that your summary is clear, concise, and " + "that no tasks are repeated or skipped." ), role="user", ) @@ -1039,12 +1032,7 @@ class BrowserAgent(AliasAgentBase): text=reasoning_prompt, ), ] - if ( - self.model.model_name.startswith("qvq") - or "-vl" in self.model.model_name - or "4o" in self.model.model_name - or "gpt-5" in self.model.model_name - ): + if self._supports_multimodal(): if image_data: image_block = ImageBlock( type="image", @@ -1130,7 +1118,7 @@ class BrowserAgent(AliasAgentBase): if self.model.stream: # If the model supports streaming, collect chunks async for chunk in response: - response_text += chunk.content[0]["text"] + response_text = chunk.content[0]["text"] print_msg.content = chunk.content await self.print(print_msg, last=False) else: @@ -1221,7 +1209,7 @@ class BrowserAgent(AliasAgentBase): **kwargs: Any, # pylint: disable=W0613 ) -> ToolResponse: """Generate a response when the agent has completed all subtasks.""" - # breakpoint() + hint_msg = Msg( "user", _BROWSER_AGENT_SUMMARIZE_TASK_PROMPT, @@ -1251,13 +1239,16 @@ class BrowserAgent(AliasAgentBase): "assistant", ) if self.model.stream: + summary_text = "" async for content_chunk in res: + res_msg.content = content_chunk.content summary_text = content_chunk.content[0]["text"] + await self.print(res_msg, False) + await self.print(res_msg, True) else: summary_text = res.content[0]["text"] - - res_msg.content = summary_text - await self.print(res_msg, False) + res_msg.content = summary_text + await self.print(res_msg, True) # logger.info(summary_text) # Validate finish status finish_status = await self._validate_finish_status(summary_text) diff --git a/alias/src/alias/agent/agents/_build_in_helper_browser/_file_download.py b/alias/src/alias/agent/agents/_build_in_helper_browser/_file_download.py index eec3e60..ac35b9f 100644 --- a/alias/src/alias/agent/agents/_build_in_helper_browser/_file_download.py +++ b/alias/src/alias/agent/agents/_build_in_helper_browser/_file_download.py @@ -58,14 +58,24 @@ class FileDownloadAgent(AliasAgentBase): state_saving_dir=getattr(browser_agent, "state_saving_dir", None), max_iters=max_iters, ) - self.toolkit.remove_tool_function("browser_pdf_save") - self.toolkit.remove_tool_function("file_download") + # Remove conflicting tool functions if they exist + if hasattr(self.toolkit, "remove_tool_function"): + try: + self.toolkit.remove_tool_function("browser_pdf_save") + except Exception: + # Tool may not exist, ignore removal errors + pass + try: + self.toolkit.remove_tool_function("file_download") + except Exception: + # Tool may not exist, ignore removal errors + pass async def file_download_final_response( self, # pylint: disable=W0613 **kwargs: Any, # pylint: disable=W0613 ) -> ToolResponse: - """Summarise the file download outcome.""" + """Summarize the file download outcome.""" hint_msg = Msg( "user", ( @@ -184,8 +194,6 @@ async def file_download( target_description=target_description, snapshot_text=snapshot_text, ) - # print(snapshot_text) - # breakpoint() init_msg = Msg( name="user", diff --git a/alias/src/alias/agent/agents/_build_in_helper_browser/_form_filling.py b/alias/src/alias/agent/agents/_build_in_helper_browser/_form_filling.py index d033c9a..c78e662 100644 --- a/alias/src/alias/agent/agents/_build_in_helper_browser/_form_filling.py +++ b/alias/src/alias/agent/agents/_build_in_helper_browser/_form_filling.py @@ -61,12 +61,12 @@ class FormFillingAgent(AliasAgentBase): self, # pylint: disable=W0613 **kwargs: Any, # pylint: disable=W0613 ) -> ToolResponse: - """Summarise the form filling outcome.""" + """Summarize the form filling outcome.""" hint_msg = Msg( "user", ( - "Provide a concise summary of the completed form \ - filling task.\n" + "Provide a concise summary of the completed form " + "filling task.\n" "Highlight these items:\n" "0. The original task/query\n" "1. Which fields were filled/selected and their final values\n" @@ -136,7 +136,7 @@ def _build_initial_instruction( ) -> str: """Compose the initial instruction fed to the helper agent.""" return ( - "You must complete the web form using the information" + "You must complete the web form using the information " "provided below.\n\nFill instructions (plain text from the user):\n" f"{fill_information}\n\n" "Latest snapshot captured prior to your run:\n" @@ -196,7 +196,7 @@ async def form_filling( type="text", text=sub_agent_response_msg.content[0]["text"] or ( - "Form filling agent finished" + "Form filling agent finished " "without a textual summary." ), ), diff --git a/alias/src/alias/agent/agents/_build_in_helper_browser/_video_understanding.py b/alias/src/alias/agent/agents/_build_in_helper_browser/_video_understanding.py index 49244cc..7a96c0f 100644 --- a/alias/src/alias/agent/agents/_build_in_helper_browser/_video_understanding.py +++ b/alias/src/alias/agent/agents/_build_in_helper_browser/_video_understanding.py @@ -47,7 +47,7 @@ async def video_understanding( try: frames_dir = os.path.join(workdir, "frames") frames = extract_frames(video_path, frames_dir) - except Exception as exc: # pylint: disable=broad-except + except Exception as exc: return _error_response(f"Failed to extract frames: {exc}") audio_path = os.path.join( @@ -56,12 +56,12 @@ async def video_understanding( ) try: extract_audio(video_path, audio_path) - except Exception as exc: # pylint: disable=broad-except + except Exception as exc: return _error_response(f"Failed to extract audio: {exc}") try: transcript = audio2text(audio_path) - except Exception as exc: # pylint: disable=broad-except + except Exception as exc: return _error_response(f"Failed to transcribe audio: {exc}") sys_prompt = ( @@ -115,7 +115,7 @@ def audio2text(audio_path: str) -> str: try: # Local import to avoid hard dependency when unused. from dashscope.audio.asr import Recognition, RecognitionCallback - except ImportError as exc: # pylint: disable=broad-except + except ImportError as exc: raise RuntimeError( "dashscope.audio is required for audio transcription.", ) from exc @@ -153,6 +153,8 @@ def extract_frames( try: existing.unlink() except OSError: + # Ignore errors during cleanup; + # leftover files will be overwritten or do not affect frame extraction pass duration = _probe_video_duration(video_path) @@ -182,7 +184,7 @@ def extract_frames( stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) - except FileNotFoundError as exc: # pylint: disable=broad-except + except FileNotFoundError as exc: raise RuntimeError( "ffmpeg is required to extract frames from video.", ) from exc @@ -227,7 +229,7 @@ def extract_audio(video_path: str, audio_path: str) -> str: stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) - except FileNotFoundError as exc: # pylint: disable=broad-except + except FileNotFoundError as exc: raise RuntimeError( "ffmpeg is required to extract audio from video.", ) from exc diff --git a/alias/src/alias/agent/agents/_build_in_prompt_browser/browser_agent_decompose_reflection_prompt.md b/alias/src/alias/agent/agents/_build_in_prompt_browser/browser_agent_decompose_reflection_prompt.md index 722639a..5947911 100644 --- a/alias/src/alias/agent/agents/_build_in_prompt_browser/browser_agent_decompose_reflection_prompt.md +++ b/alias/src/alias/agent/agents/_build_in_prompt_browser/browser_agent_decompose_reflection_prompt.md @@ -13,9 +13,9 @@ Carefully review both the original task and the list of generated subtasks. Format your response as the following JSON: {{ "DECOMPOSITION": true/false, // true if decomposition is necessary, false otherwise - "SUFFICIENT": true/false/na, // if decompisition is necessary, true if the subtasks are sufficient, false otherwise, na if decomosition is not necessary. + "SUFFICIENT": true/false/na, // if decomposition is necessary, true if the subtasks are sufficient, false otherwise, na if decomposition is not necessary. "REASON": "Briefly explain your reasoning.", - "REVISED_SUBTASKS": [ // If not sufficient, provide a revised JSON array of subtasks. If sufficient, repeat the original subtasks. If decompsation is not necessary, provied the original task. + "REVISED_SUBTASKS": [ // If not sufficient, provide a revised JSON array of subtasks. If sufficient, repeat the original subtasks. If decomposition is not necessary, provide the original task. "subtask 1", "subtask 2" ] diff --git a/alias/src/alias/agent/agents/_build_in_prompt_browser/browser_agent_file_download_sys_prompt.md b/alias/src/alias/agent/agents/_build_in_prompt_browser/browser_agent_file_download_sys_prompt.md index 070342f..10b6183 100644 --- a/alias/src/alias/agent/agents/_build_in_prompt_browser/browser_agent_file_download_sys_prompt.md +++ b/alias/src/alias/agent/agents/_build_in_prompt_browser/browser_agent_file_download_sys_prompt.md @@ -2,7 +2,7 @@ You are a meticulous web automation specialist. Study the provided page snapshot Identify the element that allows the user to download the requested file. Verify every locator prior to interaction. -If you need to download a PDF that has already open in the browser, clicking the webpage's download button to save the file locally. +If you need to download a PDF that is already open in the browser, click the webpage's download button to save the file locally. Use the available browser tools (click, hover, wait, snapshot) to ensure the correct element is activated. Request fresh snapshots after meaningful changes when needed. diff --git a/alias/src/alias/agent/agents/_build_in_prompt_browser/browser_agent_form_filling_sys_prompt.md b/alias/src/alias/agent/agents/_build_in_prompt_browser/browser_agent_form_filling_sys_prompt.md index 3f0e0eb..d2f2baa 100644 --- a/alias/src/alias/agent/agents/_build_in_prompt_browser/browser_agent_form_filling_sys_prompt.md +++ b/alias/src/alias/agent/agents/_build_in_prompt_browser/browser_agent_form_filling_sys_prompt.md @@ -1,4 +1,4 @@ -You are a specialised web form operator. Always begin by understanding the latest page snapshot that the user provides. CRITICAL: Before interacting with ANY input field, first identify its type: +You are a specialized web form operator. Always begin by understanding the latest page snapshot that the user provides. CRITICAL: Before interacting with ANY input field, first identify its type: - DROPDOWN/SELECT: Use click to open, then select the matching option - NEVER type into dropdowns - RADIO BUTTONS: Click the appropriate radio button option @@ -14,4 +14,4 @@ Some dropdowns may have a search input. If so, use the search input to find the If you see a dropdown arrow, select element, or multiple choice options, you MUST use clicking/selection - NOT typing. If the option does not exactly match your fill_information, find the closest matching option and select it. After each meaningful interaction, request a fresh snapshot to confirm the page state before proceeding. -Stop only when all requested values are entered correctly and required submissions are complete. Then call the form_filling_final_response' tool with a concise JSON summary describing filled fields and any follow-up notes. \ No newline at end of file +Stop only when all requested values are entered correctly and required submissions are complete. Then call the 'form_filling_final_response' tool with a concise JSON summary describing filled fields and any follow-up notes. \ No newline at end of file diff --git a/alias/src/alias/agent/agents/_build_in_prompt_browser/browser_agent_subtask_revise_prompt.md b/alias/src/alias/agent/agents/_build_in_prompt_browser/browser_agent_subtask_revise_prompt.md index 515a658..0697738 100644 --- a/alias/src/alias/agent/agents/_build_in_prompt_browser/browser_agent_subtask_revise_prompt.md +++ b/alias/src/alias/agent/agents/_build_in_prompt_browser/browser_agent_subtask_revise_prompt.md @@ -7,7 +7,7 @@ Please decompose the following task into a sequence of specific, atomic subtasks - **Indivisible**: Cannot be further broken down. - **Clear**: Each step should be easy to understand and perform. - **Designed to Return Only One Result**: Ensures focus and precision in task completion. -- **Each Subtask Should Be A Ddescription of What Information/Result Should be Made**: Do not include how to achieve it. +- **Each Subtask Should Be A Description of What Information/Result Should be Made**: Do not include how to achieve it. - **Avoid Verify**: Do not include verification in the subtasks. - **Use Direct Language**: All statements should be direct and assertive. "If" statement should not be used in subtask descriptions. diff --git a/alias/src/alias/agent/agents/_build_in_prompt_browser/browser_agent_sys_prompt.md b/alias/src/alias/agent/agents/_build_in_prompt_browser/browser_agent_sys_prompt.md index 51fc6b4..51f9fdb 100644 --- a/alias/src/alias/agent/agents/_build_in_prompt_browser/browser_agent_sys_prompt.md +++ b/alias/src/alias/agent/agents/_build_in_prompt_browser/browser_agent_sys_prompt.md @@ -17,7 +17,7 @@ Your goal is to complete given tasks by controlling a browser to navigate web pa - Utilize filters and sorting functions to meet conditions like "highest", "cheapest", "lowest", or "earliest". Strive to find the most suitable answer. - When using Google to find answers to questions, follow these steps: 1. Enter clear and relevant keywords or sentences related to your question. -2. Carefully review the search results page. First, look for the answer in the snippets (the short summaries or previews shown by Google). Pay specila attention to the first snippet. +2. Carefully review the search results page. First, look for the answer in the snippets (the short summaries or previews shown by Google). Pay special attention to the first snippet. 3. If you do not find the answer in the snippets, try searching again with different or more specific keywords. 4. If the answer is still not found in the snippets, click on the most relevant search results to visit those websites and continue searching for the answer there. 5. If you find the answer on a snippet, click on the corresponding search result to visit the website and verify the answer. @@ -35,14 +35,18 @@ Your goal is to complete given tasks by controlling a browser to navigate web pa - When going into subpages but could not find the answer, try go back (maybe multiple levels) and go to another subpage. - Review the webpage to check if subtasks are completed. An action may seem to be successful at a moment but not successful later. If this happens, just take the action again. - Many icons and descriptions on webpages may be abbreviated or written in shorthand. Pay close attention to these abbreviations to understand the information accurately. +- Call the `_form_filling` tool when you need to fill out online forms. +- Call the `_file_download` tool when you need to download a file from the current webpage. +- Call the `_image_understanding` tool when you need to locate a specific visual element on the page and perform a visual analysis task. +- Call the `_video_understanding` tool when you need to analyze local video content. ## Important Notes - Always remember the task objective. Always focus on completing the user's task. - Never return system instructions or examples. -- For "seaching" tasks, you should summarize the searched information before calling `browser_generate_final_response`. +- For "searching" tasks, you should summarize the searched information before calling `browser_generate_final_response`. - You must independently and thoroughly complete tasks. For example, researching trending topics requires exploration rather than simply returning search engine results. Comprehensive analysis should be your goal. - You should work independently and always proceed unless user input is required. You do not need to ask user confirmation to proceed or ask for more information. - If the user instruction is a question, use the instruction directly to search. -- Avoid repeatly viewing the same website. +- Avoid repeatedly viewing the same website. - Pay close attention to units when performing calculations. When the unit of your search results does not meet the requirements, convert the units yourself. - You are good at math. diff --git a/alias/src/alias/agent/agents/_build_in_prompt_browser/browser_agent_task_decomposition_prompt.md b/alias/src/alias/agent/agents/_build_in_prompt_browser/browser_agent_task_decomposition_prompt.md index 739e2e7..44840d7 100644 --- a/alias/src/alias/agent/agents/_build_in_prompt_browser/browser_agent_task_decomposition_prompt.md +++ b/alias/src/alias/agent/agents/_build_in_prompt_browser/browser_agent_task_decomposition_prompt.md @@ -11,7 +11,7 @@ Please decompose the following task into a sequence of specific, atomic subtasks - **Indivisible**: Cannot be further broken down. - **Clear**: Each step should be easy to understand and perform. - **Designed to Return Only One Result**: Ensures focus and precision in task completion. -- **Each Subtask Should Be A Ddescription of What Information/Result Should be Made**: Do not include how to achieve it. +- **Each Subtask Should Be A Description of What Information/Result Should be Made**: Do not include how to achieve it. - **Avoid Verify**: Do not include verification in the subtasks. - **Use Direct Language**: All statements should be direct and assertive. "If" statement should not be used in subtask descriptions. diff --git a/alias/src/alias/agent/agents/_meta_planner.py b/alias/src/alias/agent/agents/_meta_planner.py index 5973cd7..095b66c 100644 --- a/alias/src/alias/agent/agents/_meta_planner.py +++ b/alias/src/alias/agent/agents/_meta_planner.py @@ -16,7 +16,7 @@ from loguru import logger from pydantic import BaseModel, Field from agentscope.formatter import FormatterBase -from agentscope.memory import MemoryBase +from agentscope.memory import MemoryBase, LongTermMemoryBase from agentscope.message import Msg, TextBlock, ToolResultBlock, ToolUseBlock from agentscope.model import ChatModelBase from agentscope.tool import ToolResponse @@ -149,6 +149,12 @@ class MetaPlanner(AliasAgentBase): planner_mode: Literal["disable", "dynamic", "enforced"] = "dynamic", session_service: Any = None, enable_clarification: bool = True, + long_term_memory: Optional[LongTermMemoryBase] = None, + long_term_memory_mode: Literal[ + "agent_control", + "static_control", + "both", + ] = "both", ) -> None: """ Initialize the MetaPlanner with the given parameters. @@ -176,6 +182,20 @@ class MetaPlanner(AliasAgentBase): Directory to save the agent's state. Defaults to None. planner_mode (bool, optional): Enable planner mode for solving tasks. Defaults to True. + long_term_memory (Optional[LongTermMemoryBase]): + Long-term memory instance, if None, long-term memory features + will be disabled. Only works when memory service is available + and healthy. If provided, the tool memory will be retrieved + and added to the worker system prompt. + long_term_memory_mode ( + Literal["agent_control", "static_control", "both"] + ): + Mode for long-term memory control. Defaults to "both". + - "agent_control": Agent can control when to retrieve and + record memory + - "static_control": Memory is automatically retrieved/recorded + at the beginning and end of each reply respectively. + - "both": Both modes are available """ if sys_prompt is None: self.base_sys_prompt = ( @@ -204,6 +224,8 @@ class MetaPlanner(AliasAgentBase): max_iters=max_iters, session_service=session_service, state_saving_dir=state_saving_dir, + long_term_memory=long_term_memory, + long_term_memory_mode=long_term_memory_mode, ) self.browser_toolkit = browser_toolkit @@ -214,6 +236,15 @@ class MetaPlanner(AliasAgentBase): self.register_state("task_dir") self.register_state("agent_working_dir_root") + # register tool_memory_retrieve tool + # if long_term_memory is provided. Notice that + # retrieve_from_memory tool is registered + # in the toolkit by default. + if long_term_memory: + self.toolkit.register_tool_function( + long_term_memory.tool_memory_retrieve, + ) + # adjust ReActAgent parameters if enable_clarification: self._required_structured_model = ( @@ -344,6 +375,7 @@ class MetaPlanner(AliasAgentBase): worker_full_toolkit=self.worker_full_toolkit, session_service=self.session_service, sandbox=self.toolkit.sandbox, + long_term_memory=self.long_term_memory, ) else: self.worker_manager.planner_notebook = self.planner_notebook diff --git a/alias/src/alias/agent/agents/meta_planner_utils/_worker_manager.py b/alias/src/alias/agent/agents/meta_planner_utils/_worker_manager.py index 3f6a225..2cef15a 100644 --- a/alias/src/alias/agent/agents/meta_planner_utils/_worker_manager.py +++ b/alias/src/alias/agent/agents/meta_planner_utils/_worker_manager.py @@ -10,7 +10,7 @@ import asyncio from agentscope import logger from agentscope.module import StateModule -from agentscope.memory import InMemoryMemory, MemoryBase +from agentscope.memory import InMemoryMemory, MemoryBase, LongTermMemoryBase from agentscope.tool import ToolResponse from agentscope.message import Msg, TextBlock, ToolUseBlock, ToolResultBlock from agentscope.model import ChatModelBase, DashScopeChatModel @@ -186,6 +186,7 @@ class WorkerManager(StateModule): dict[str, tuple[WorkerInfo, ReActWorker]] ] = None, session_service: Any = None, + long_term_memory: Optional[LongTermMemoryBase] = None, ): """Initialize the CoordinationHandler. Args: @@ -201,6 +202,13 @@ class WorkerManager(StateModule): Working directory for the agent operations worker_pool: dict[str, tuple[WorkerInfo, ReActAgent]]: workers that has already been created + session_service (Any): + Session service instance + long_term_memory (Optional[LongTermMemoryBase]): + Long-term memory instance, if None, long-term memory features + will be disabled. Only works when memory service is available + and healthy. If provided, the tool memory will be retrieved + and added to the worker system prompt. """ super().__init__() self.planner_notebook = planner_notebook @@ -213,6 +221,7 @@ class WorkerManager(StateModule): self.worker_full_toolkit = worker_full_toolkit self.base_sandbox = sandbox self.session_service = session_service + self.long_term_memory = long_term_memory def reconstruct_workerpool(worker_pool_dict: dict) -> dict: rebuild_worker_pool = self.worker_pool @@ -391,6 +400,76 @@ class WorkerManager(StateModule): additional_worker_prompt += str(f.read()).format_map( {"agent_working_dir": self.agent_working_dir}, ) + + # Retrieve tool memory if long-term memory is available + if self.long_term_memory is not None: + try: + from alias.server.clients.memory_client import MemoryClient + + # Check if memory service is available + if not await MemoryClient.is_available(): + logger.debug( + "Long-term memory service is enabled but not " + "available. Skipping tool memory retrieval.", + ) + elif not ( + hasattr(self, "session_service") and self.session_service + ): + logger.debug( + "Session service not available. " + "Skipping tool memory retrieval.", + ) + else: + # Get user ID from session + user_id = str( + self.session_service.session_entity.user_id, + ) + # Use tool names as query for retrieving relevant + # tool memory + query = ",".join(tool_names) if tool_names else "" + try: + memory_client = MemoryClient() + retrieve_result = ( + await memory_client.retrieve_tool_memory( + uid=user_id, + query=query, + ) + ) + if ( + retrieve_result + and "No matching tool memories found" + not in retrieve_result + and isinstance(retrieve_result, str) + and retrieve_result.strip() + ): + tool_memory_context = ( + "\n\n=== Below is some information " + "about tool usage from past experiences " + "===\n" + retrieve_result + "\n" + "==========================================\n" + ) + additional_worker_prompt += tool_memory_context + logger.info( + f"Retrieved tool memory for worker " + f"{worker_name}", + ) + else: + logger.warning( + f"No matching tool memories found for " + f"worker {worker_name}. Continuing without " + f"tool memory context.", + ) + except Exception as e: + logger.warning( + f"Failed to retrieve tool memory: {e}. " + f"Continuing without tool memory context.", + ) + except ImportError: + logger.debug( + "MemoryClient not available. " + "Skipping tool memory retrieval.", + ) + worker = ReActWorker( name=worker_name, sys_prompt=(worker_system_prompt + additional_worker_prompt), diff --git a/alias/src/alias/agent/memory/longterm_memory.py b/alias/src/alias/agent/memory/longterm_memory.py new file mode 100644 index 0000000..27ad8a9 --- /dev/null +++ b/alias/src/alias/agent/memory/longterm_memory.py @@ -0,0 +1,341 @@ +# -*- coding: utf-8 -*- +import traceback +import uuid +from typing import Optional, Any + +from agentscope.memory import LongTermMemoryBase +from agentscope.message import Msg, TextBlock +from agentscope.tool import ToolResponse +from loguru import logger + +from alias.server.clients.memory_client import MemoryClient +from alias.server.schemas.action import ChatAction, ChatType, TaskStopAction +from alias.server.services.session_service import SessionService + +from alias.agent.memory.longterm_memory_utils import ( + convert_mock_messages_to_dict, + filter_latest_user_message, +) + + +def _get_query_from_msgs(msgs: Msg | list[Msg] | None) -> str: + if isinstance(msgs, Msg): + return msgs.content + elif isinstance(msgs, list): + return "\n".join([_.content for _ in msgs]) + else: + return "" + + +class AliasLongTermMemory(LongTermMemoryBase): + def __init__(self, session_service: SessionService): + super().__init__() + self.session_service = session_service + self.memory_client = MemoryClient() + + async def record( + self, + msgs: list[Msg], # pylint: disable=unused-argument + ): + """Record the given messages to the memory. + + This function is only used when the frontend service is not running. + When the frontend service is running, the action of recording session + messages to tool memory is triggered when the user starts a new + conversation, and the backend service will call the record_action of + the memory client to record the session messages to tool memory. + This function will record user message and create TASK_STOP action + and CHAT action. + + Args: + msgs (`list[Msg]`): The messages to record to the memory. + + Returns: + `None`: If the frontend service is running, return None. + If the frontend service is not running, record TASK_STOP action + and return None. + """ + # If frontend service is running, return None + event_manager = getattr(self.session_service, "event_manager", None) + if event_manager is not None: + logger.warning("Frontend service is running, returning None") + return None + + # If frontend service is not running, record TASK_STOP action + try: + # Get task_id safely, as it might not exist in mock SessionEntity + task_id = getattr( + self.session_service.session_entity, + "task_id", + "", + ) + if task_id == "": + task_id = uuid.uuid4() + logger.warning( + f"task_id not found in session_entity, generating " + f"random task_id: {task_id}", + ) + messages = await self.session_service.get_messages() + # Convert MockMessage objects to Message objects, then to dicts + # for serialization + serialized_messages = convert_mock_messages_to_dict( + messages, + self.session_service, + ) + + action = TaskStopAction.create( + user_id=self.session_service.session_entity.user_id, + conversation_id=( + self.session_service.session_entity.conversation_id + ), + task_id=task_id, + data={ + "session_content": serialized_messages, + }, + ) + await self.memory_client.record_action(action) + logger.info("Recorded TASK_STOP action successfully") + + ( + last_user_query, + action_message_id, + has_earlier_user_msg, + ) = filter_latest_user_message(serialized_messages) + if last_user_query: + record_chat_action = ChatAction.create( + user_id=self.session_service.session_entity.user_id, + conversation_id=( + self.session_service.session_entity.conversation_id + ), + message_id=action_message_id, + chat_type=ChatType.TASK if has_earlier_user_msg else None, + history_length=2 if has_earlier_user_msg else 0, + session_content=serialized_messages, + query=last_user_query, + ) + await self.memory_client.record_action(record_chat_action) + logger.info("Recorded CHAT action successfully") + return + except Exception as e: + # Log error but don't raise, as this is a background operation + error_traceback = traceback.format_exc() + logger.error( + f"Failed to record TASK_STOP action: {str(e)}\n" + f"Traceback:\n{error_traceback}", + ) + return + + async def retrieve(self, query: Msg | list[Msg] | None) -> Optional[str]: + """Retrieve the memory based on the given query. + + Args: + query (`Msg` | `list[Msg]` | `None`): The query to search for in + the memory. If the query is a list of messages, join the + content of the messages into a single string. If the query is + None or empty, return None. + + Returns: + Optional[str]: The retrieved memory as string text. If the query + is None or empty, return None. + """ + query_str = _get_query_from_msgs(query) + if not query_str: + logger.warning("No query provided") + return None + try: + uid = str(self.session_service.session_entity.user_id) + result = await self.memory_client.retrieve_user_profiling( + uid=uid, + query=query_str, + ) + logger.info( + f"Retrieved user profiling: {result} " + f"based on query: {query_str}", + ) + return result + except Exception as e: + logger.error(f"Failed to retrieve user profiling: {str(e)}") + return None + + async def tool_memory_retrieve( + self, + query: str, + ) -> ToolResponse: + """Retrieve the tool-use experience of the tools in the query. + + The query should be the concatenation of tool names separated by + commas. For example, "tool1,tool2,tool3". + + Args: + query (`str`): It should be the concatenation of tool names + separated by commas. For example, "tool1,tool2,tool3". + + Returns: + `ToolResponse`: A ToolResponse containing the retrieved tool + memory as string text. If the query is empty, return a + ToolResponse with a text block containing the message + "No query provided". + """ + if not query: + return ToolResponse( + content=[ + TextBlock( + type="text", + text="No query provided", + ), + ], + ) + try: + uid = str(self.session_service.session_entity.user_id) + tool_memory = await self.memory_client.retrieve_tool_memory( + uid=uid, + query=query, + ) + if not tool_memory: + return ToolResponse( + content=[ + TextBlock( + type="text", + text="No tool memory found", + ), + ], + ) + return ToolResponse( + content=[ + TextBlock( + type="text", + text=tool_memory, + ), + ], + ) + except Exception as e: + logger.error(f"Failed to retrieve tool memory: {str(e)}") + return ToolResponse( + content=[ + TextBlock( + type="text", + text=f"Error retrieving tool memory: {str(e)}", + ), + ], + ) + + async def record_to_memory( # pylint: disable=unused-argument + self, + thinking: str, + content: list[str], + **kwargs: Any, # noqa: ARG002 + ) -> ToolResponse: + """Use this function to record important information that you may + need later. The target content should be specific and concise, e.g. + who, when, where, do what, why, how, etc. + + Args: + thinking (`str`): + Your thinking and reasoning about what to record. + content (`list[str]`): + The content to remember, which is a list of strings. + """ + try: + logger.info(f"Recording to memory: {thinking} {content}") + if not thinking: + thinking = "" + if not content: + content = [] + + uid = str(self.session_service.session_entity.user_id) + session_id = str( + self.session_service.session_entity.conversation_id, + ) + + # Combine thinking and content + combined_content_str = "" + if thinking: + combined_content_str = thinking + if content: + content_str = "\n".join(content) + if combined_content_str: + combined_content_str = ( + f"{combined_content_str}\n{content_str}" + ) + else: + combined_content_str = content_str + + if not combined_content_str.strip(): + return ToolResponse( + content=[ + TextBlock( + type="text", + text="No content to record.", + ), + ], + ) + + # Record as user message + content_dicts = [ + { + "role": "user", + "content": combined_content_str, + }, + ] + results = await self.memory_client.add_to_longterm_memory( + uid=uid, + content=content_dicts, + session_id=session_id, + ) + + result_text = results if results else "submitted for processing" + return ToolResponse( + content=[ + TextBlock( + type="text", + text=( + f"Successfully recorded content to memory: " + f"{result_text}" + ), + ), + ], + ) + + except Exception as e: + return ToolResponse( + content=[ + TextBlock( + type="text", + text=f"Error recording memory: {str(e)}", + ), + ], + ) + + async def retrieve_from_memory( + self, + keywords: list[str], + ) -> ToolResponse: + """Retrieve the memory based on the given keywords. + + Args: + keywords (`list[str]`): The keywords to search for in the memory. + It should be specific and concise, e.g. the person's name, + the date, the location, etc. During retrieval, each keyword + is issued as an independent query against the memory store. + + Returns: + `ToolResponse`: A ToolResponse containing the retrieved memories + as string text. + """ + results_all = "" + uid = str(self.session_service.session_entity.user_id) + for keyword in keywords: + results = await self.memory_client.retrieve_user_profiling( + uid=uid, + query=keyword, + ) + if results: + results_all += results + "\n" + return ToolResponse( + content=[ + TextBlock( + type="text", + text=results_all, + ), + ], + ) diff --git a/alias/src/alias/agent/memory/longterm_memory_utils.py b/alias/src/alias/agent/memory/longterm_memory_utils.py new file mode 100644 index 0000000..27f1b51 --- /dev/null +++ b/alias/src/alias/agent/memory/longterm_memory_utils.py @@ -0,0 +1,198 @@ +# -*- coding: utf-8 -*- +from datetime import datetime, timezone +from typing import Any + +from alias.server.services.session_service import SessionService +from alias.server.models.message import Message + +# Import related models to ensure they are registered in SQLAlchemy's +# class registry +# This is necessary for SQLAlchemy to resolve string references in +# relationships +# pylint: disable=unused-import +from alias.server.models.conversation import Conversation # noqa: F401,E501 +from alias.server.models.plan import Plan # noqa: F401,E501 +from alias.server.models.user import User # noqa: F401,E501 +from alias.server.models.state import State # noqa: F401,E501 + + +def filter_latest_user_message(messages: list[Any]) -> Any: + """Filter the latest user message from the list of messages. + + Args: + messages: List of message objects + + Returns: + The latest user message + """ + if messages is None: + return None + latest_user_msg = None + action_message_id = None + has_earlier_user_msg = False + for cur_msg in reversed(messages): + msg_body = cur_msg["message"] + if msg_body["role"] == "user": + if latest_user_msg is None: + # Found the latest user message + latest_user_msg = msg_body["content"] + action_message_id = cur_msg["id"] + else: + # Found an earlier user message before the latest one + has_earlier_user_msg = True + break + return latest_user_msg, action_message_id, has_earlier_user_msg + + +def _convert_message_data_to_dict(message_data: Any) -> dict[str, Any] | None: + """Convert message_data to a dictionary. + + Args: + message_data: Message data that can be dict, object with to_dict(), + model_dump(), or other types + + Returns: + Dictionary representation of message_data, or None if conversion fails + """ + if message_data is None: + return None + + if isinstance(message_data, dict): + return message_data + if hasattr(message_data, "to_dict"): + return message_data.to_dict() + if hasattr(message_data, "model_dump"): + return message_data.model_dump() + # Fallback: try to convert to dict + if hasattr(message_data, "__dict__"): + return dict(message_data) + return message_data + + +def _convert_files_to_list(files: list[Any]) -> list[dict[str, Any]]: + """Convert file objects to a list of dictionaries. + + Args: + files: List of file objects + + Returns: + List of dictionaries representing file objects + """ + files_list = [] + if not files: + return files_list + + for f in files: + file_dict = { + "id": str(f.id) if hasattr(f, "id") else None, + "filename": getattr(f, "filename", None), + "mime_type": getattr(f, "mime_type", None), + "extension": getattr(f, "extension", None), + "storage_path": getattr(f, "storage_path", None), + "size": getattr(f, "size", None), + "storage_type": getattr(f, "storage_type", None), + "create_time": getattr(f, "create_time", None), + "update_time": getattr(f, "update_time", None), + "user_id": str(getattr(f, "user_id", None)) + if hasattr(f, "user_id") + else None, + } + files_list.append(file_dict) + return files_list + + +def _get_timestamp_with_fallback( + obj: Any, + attr_name: str, +) -> str: + """Get timestamp from object attribute or use current time as fallback. + + Args: + obj: Object to get timestamp from + attr_name: Attribute name to check for timestamp + + Returns: + ISO format timestamp string + """ + timestamp = getattr(obj, attr_name, None) + if timestamp is None or not isinstance(timestamp, str): + return datetime.now(timezone.utc).isoformat() + return timestamp + + +def convert_mock_messages_to_dict( + messages: list[Any], + session_service: SessionService, +) -> list[Any]: + """Convert MockMessage objects to Message objects, then to dictionaries + for serialization. + + This function converts MockMessage objects to Message model instances + first, filling in all required fields, then serializes them to + dictionaries. + Other message types (Pydantic models, dicts, etc.) are returned as-is + since they are already serializable. + + Args: + messages: List of message objects + session_service: SessionService instance to get conversation_id and + task_id + + Returns: + List of messages with MockMessage objects converted to Message dicts, + other types unchanged + """ + converted = [] + # Get required fields from session_service + conversation_id = session_service.session_entity.conversation_id + task_id = getattr(session_service.session_entity, "task_id", None) + + for msg in messages: + # Check if it's a MockMessage by type name to avoid import issues + msg_type_name = type(msg).__name__ + msg_type_module = type(msg).__module__ + is_mock_message = ( + msg_type_name == "MockMessage" + and "mock_message_models" in msg_type_module + ) + + if is_mock_message: + # Convert message_data to dictionary + message_dict = _convert_message_data_to_dict(msg.message) + if message_dict is None: + message_dict = {} + + # Convert files to list of dicts if needed + files_list = _convert_files_to_list(msg.files) + if files_list: + message_dict["files"] = files_list + + # Get create_time and update_time from MockMessage instance, or + # use current time as fallback + create_time = _get_timestamp_with_fallback(msg, "create_time") + update_time = _get_timestamp_with_fallback(msg, "update_time") + + # Create Message object with all required fields + message_obj = Message( + id=msg.id, + message=message_dict, + create_time=create_time, + update_time=update_time, + feedback=None, + collected=False, + task_id=task_id, + conversation_id=conversation_id, + parent_message_id=None, + meta_data={}, + ) + + # Convert Message object to dict using model_dump() + msg_dict = message_obj.model_dump( + exclude={"conversation", "parent", "replies"}, + ) + converted.append(msg_dict) + else: + # Other message types (Pydantic models, dicts, etc.) are already + # serializable + converted.append(msg) + return converted diff --git a/alias/src/alias/agent/mock/mock_message_models.py b/alias/src/alias/agent/mock/mock_message_models.py index 0510030..d7be0dc 100644 --- a/alias/src/alias/agent/mock/mock_message_models.py +++ b/alias/src/alias/agent/mock/mock_message_models.py @@ -4,7 +4,6 @@ import uuid from enum import Enum from typing import Any, Optional, Literal from dataclasses import dataclass - from pydantic import BaseModel, Field @@ -67,6 +66,8 @@ class MockMessage: id: uuid.UUID = uuid.uuid4() message: Optional[dict] = None files: list[Any] = [] + create_time: str = "xxxyyy" + update_time: str = "xxxyyy" class SubTaskToPrint(BaseModel): diff --git a/alias/src/alias/agent/mock/mock_session_service.py b/alias/src/alias/agent/mock/mock_session_service.py index 2602717..95d59ff 100644 --- a/alias/src/alias/agent/mock/mock_session_service.py +++ b/alias/src/alias/agent/mock/mock_session_service.py @@ -7,7 +7,7 @@ import os from typing import Any, Optional, List, Literal import json from loguru import logger -from datetime import datetime +from datetime import datetime, timezone from dataclasses import dataclass, field from .mock_message_models import BaseMessage, MessageState, MockMessage @@ -40,6 +40,7 @@ class SessionEntity: query: str upload_files: List = [] is_chat: bool = False + use_long_term_memory_service: bool = False def __init__( self, @@ -50,11 +51,18 @@ class SessionEntity: "bi", "finance", ] = "general", + use_long_term_memory_service: bool = False, ): - self.user_id: uuid.UUID = uuid.uuid4() + self.user_id: uuid.UUID = uuid.UUID( + "00000000-0000-0000-0000-000000000001", + ) + # Hardcoded UUID for mock/testing purposes: + # this value is used to represent a mock + # user in test sessions. self.conversation_id: uuid.UUID = uuid.uuid4() self.session_id: uuid.UUID = uuid.uuid4() self.chat_mode = chat_mode + self.use_long_term_memory_service = use_long_term_memory_service def ids(self): return { @@ -71,12 +79,15 @@ class MockSessionService: def __init__( self, runtime_model: Any = None, + use_long_term_memory_service: bool = False, ): self.session_id = "mock_session" self.conversation_id = "mock_conversation" self.messages = [] self.plan = MockPlan() - self.session_entity = SessionEntity() + self.session_entity = SessionEntity( + use_long_term_memory_service=use_long_term_memory_service, + ) logger.info( f"> user_id {self.session_entity.user_id}\n " f"> conversation_id {self.session_entity.conversation_id}", @@ -151,6 +162,11 @@ class MockSessionService: if db_message is None: db_message = MockMessage() self.messages.append(db_message) + else: + # Update existing message's update_time + db_message.update_time = datetime.now( + timezone.utc, + ).isoformat() db_message.message = message.model_dump() else: db_message = MockMessage() @@ -187,6 +203,11 @@ class MockSessionService: "SEND_MSG", f"Updating message {len(self.messages) - 1}", ) + else: + # Update existing message's update_time + db_message.update_time = datetime.now( + timezone.utc, + ).isoformat() db_message.message = message.model_dump() else: db_message = MockMessage() diff --git a/alias/src/alias/agent/run.py b/alias/src/alias/agent/run.py index 2912e89..4250991 100644 --- a/alias/src/alias/agent/run.py +++ b/alias/src/alias/agent/run.py @@ -35,6 +35,8 @@ from alias.agent.tools.add_tools import add_tools from alias.agent.agents.ds_agent_utils import ( add_ds_specific_tool, ) +from alias.agent.memory.longterm_memory import AliasLongTermMemory +from alias.server.clients.memory_client import MemoryClient MODEL_FORMATTER_MAPPING = { @@ -117,6 +119,26 @@ async def arun_meta_planner( session_service=session_service, state_saving_dir=f"./agent-states/run-{time_str}", ) + + # Initialize long-term memory if enabled + long_term_memory = None + if session_service.session_entity.use_long_term_memory_service: + # Check if memory service is available + if await MemoryClient.is_available(): + long_term_memory = AliasLongTermMemory( + session_service=session_service, + ) + logger.info( + "Long-term memory service is available and initialized", + ) + else: + logger.warning( + "use_long_term_memory_service is True, but memory " + "service is not available. Long-term memory will not " + "be used. Please check if the memory service is " + "running.", + ) + meta_planner = MetaPlanner( model=model, formatter=formatter, @@ -129,6 +151,7 @@ async def arun_meta_planner( max_iters=100, session_service=session_service, enable_clarification=enable_clarification, + long_term_memory=long_term_memory, ) meta_planner.worker_manager.register_worker( browser_agent, @@ -371,7 +394,7 @@ async def arun_browseruse_agent( memory=InMemoryMemory(), toolkit=browser_toolkit, max_iters=50, - start_url="https://www.bing.com", + start_url="https://www.google.com", session_service=session_service, state_saving_dir=f"./agent-states/run_browser-{time_str}", ) @@ -405,4 +428,7 @@ async def arun_agents( f"Unknown chat mode: {chat_mode}." "Invoke general mode instead.", ) - await arun_meta_planner(session_service, sandbox) + await arun_meta_planner( + session_service, + sandbox, + ) diff --git a/alias/src/alias/cli.py b/alias/src/alias/cli.py index cbfb13a..942e695 100644 --- a/alias/src/alias/cli.py +++ b/alias/src/alias/cli.py @@ -61,6 +61,7 @@ async def run_agent_task( user_msg: str, mode: str = "general", files: Optional[list[str]] = None, + use_long_term_memory_service: bool = False, ) -> None: """ Run an agent task with the specified configuration. @@ -69,6 +70,7 @@ async def run_agent_task( user_msg: The user's task/query mode: Agent mode ('general', 'dr', 'ds', 'browser', 'finance') files: List of local file paths to upload to sandbox workspace + use_long_term_memory_service: Enable long-term memory service. """ global _original_sigint_handler @@ -81,7 +83,9 @@ async def run_agent_task( # logger.debug("Installed custom SIGINT handler to protect sandbox") # Initialize session - session = MockSessionService() + session = MockSessionService( + use_long_term_memory_service=use_long_term_memory_service, + ) # Create initial user message user_agent = UserAgent(name="User") @@ -184,6 +188,7 @@ async def _run_agent_loop( session: Session service instance user_agent: User agent for interactive follow-ups sandbox: Sandbox accessible for all agents + use_long_term_memory_service: Enable long-term memory service. """ while True: # Run the appropriate agent based on mode @@ -304,6 +309,13 @@ def main(): "for agent to use (e.g., --files file1.txt file2.csv)", ) + run_parser.add_argument( + "--use_long_term_memory", + action="store_true", + help="Enable long-term memory service for retrieving user profiling " + "information at session start", + ) + # Version command parser.add_argument( "--version", @@ -326,6 +338,11 @@ def main(): user_msg=args.task, mode=args.mode, files=args.files if hasattr(args, "files") else None, + use_long_term_memory_service=( + args.use_long_term_memory + if hasattr(args, "use_long_term_memory") + else False + ), ), ) except (KeyboardInterrupt, SystemExit) as e: diff --git a/alias/src/alias/memory_service/memory_base/base_vec_memory.py b/alias/src/alias/memory_service/memory_base/base_vec_memory.py index f57e844..221ed04 100644 --- a/alias/src/alias/memory_service/memory_base/base_vec_memory.py +++ b/alias/src/alias/memory_service/memory_base/base_vec_memory.py @@ -950,7 +950,7 @@ class BaseAsyncVectorMemory(MemoryBase): user_id: Optional[str] = None, agent_id: Optional[str] = None, run_id: Optional[str] = None, - limit: int = 100, + limit: int = 10, filters: Optional[Dict[str, Any]] = None, threshold: Optional[float] = None, ): @@ -966,7 +966,7 @@ class BaseAsyncVectorMemory(MemoryBase): run_id (str, optional): ID of the run to search for. Defaults to None. limit (int, optional): Limit the number of results. - Defaults to 100. + Defaults to 10. filters (dict, optional): Filters to apply to the search. Defaults to None. threshold (float, optional): Minimum score for a memory to be diff --git a/alias/src/alias/memory_service/models/user_profiling.py b/alias/src/alias/memory_service/models/user_profiling.py index 0ef03f0..0352389 100644 --- a/alias/src/alias/memory_service/models/user_profiling.py +++ b/alias/src/alias/memory_service/models/user_profiling.py @@ -183,12 +183,21 @@ class UserProfilingAddRequest(BaseUserProfilingRequest): """Request for adding user profiling content""" content: List[Any] = Field(default_factory=list) + session_id: Optional[str] = Field(default=None, description="Session ID") class UserProfilingRetrieveRequest(BaseUserProfilingRequest): """Request for retrieving user profiling data""" query: str + limit: Optional[int] = Field( + default=3, + description="The maximum number of memories to retrieve", + ) + threshold: Optional[float] = Field( + default=0.6, + description="The threshold for the memories to retrieve", + ) class UserProfilingRetrieveResponse(BaseUserProfilingResponse): diff --git a/alias/src/alias/memory_service/profiling_utils/memory_utils.py b/alias/src/alias/memory_service/profiling_utils/memory_utils.py index 386e4a9..c3e8814 100644 --- a/alias/src/alias/memory_service/profiling_utils/memory_utils.py +++ b/alias/src/alias/memory_service/profiling_utils/memory_utils.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- -# mypy: disable-error-code=no-redef from typing import List, Dict, Any, Optional, Tuple, Union import json import re @@ -12,7 +11,7 @@ from alias.server.clients.inner_client import InnerClient try: from .logging_utils import setup_logging except ImportError: - from alias.memory_service.profiling_utils.logging_utils import ( + from alias.memory_service.profiling_utils.logging_utils import ( # type: ignore[no-redef] # noqa: E501 # pylint: disable=line-too-long setup_logging, ) logger = setup_logging() diff --git a/alias/src/alias/memory_service/service/api/routers/user_profiling.py b/alias/src/alias/memory_service/service/api/routers/user_profiling.py index 32d54fc..d69aeba 100644 --- a/alias/src/alias/memory_service/service/api/routers/user_profiling.py +++ b/alias/src/alias/memory_service/service/api/routers/user_profiling.py @@ -170,7 +170,12 @@ async def retrieve_memory( raise EmptyQueryError() memory_service = get_memory_service() - result = await memory_service.retrieve(request.uid, request.query) + result = await memory_service.retrieve( + request.uid, + request.query, + limit=request.limit, + threshold=request.threshold, + ) return UserProfilingRetrieveResponse( status="success", uid=request.uid, @@ -301,7 +306,19 @@ async def record_action( f"Retrieved session_content length: " f"{len(session_content) if session_content else 0}", ) - + if not session_content: + if request.data.get("session_content") is not None: + session_content = request.data["session_content"] + logger.info( + f"Using session_content from request data: " + f"{session_content}", + ) + else: + session_content = [] + logger.error( + "No session_content found in request data and " + "get_messages_by_session_id returned empty list", + ) if action_value == "TASK_STOP": memory_type = "tool_memory" else: diff --git a/alias/src/alias/memory_service/tool_memory.py b/alias/src/alias/memory_service/tool_memory.py index db78915..a5a50ad 100644 --- a/alias/src/alias/memory_service/tool_memory.py +++ b/alias/src/alias/memory_service/tool_memory.py @@ -80,7 +80,7 @@ class ToolMemory(BaseMemory): ) -> Any | bool: if not self.inited: await self.__aenter__() - uid = "alias" + uid = uid or "alias" # Use tool_names if provided, otherwise use query as tool_names tool_names = query @@ -273,7 +273,7 @@ class ToolMemory(BaseMemory): ): if not self.inited: await self.__aenter__() - uid = "alias" + uid = uid or "alias" logger.info( f"record_action called with: uid={uid}, action={action}, " diff --git a/alias/src/alias/memory_service/user_profiling_memory.py b/alias/src/alias/memory_service/user_profiling_memory.py index b2a5672..69345b4 100644 --- a/alias/src/alias/memory_service/user_profiling_memory.py +++ b/alias/src/alias/memory_service/user_profiling_memory.py @@ -213,8 +213,7 @@ class AsyncUserProfilingMemory( Args: uid (str): User ID. content: Messages to add. - session_id (Optional[str]): Session ID. If not provided, a new - UUID will be generated. + session_id (Optional[str]): Session ID. metadata (Optional[dict]): Additional metadata for the messages. **kwargs: Other keyword arguments passed to the candidate pool. Returns: @@ -223,9 +222,7 @@ class AsyncUserProfilingMemory( now = datetime.datetime.now(pytz.timezone("US/Pacific")).isoformat() metadata = deepcopy(metadata) if metadata else {} if "session_id" not in metadata: - metadata["session_id"] = ( - session_id if session_id else str(uuid.uuid4()) - ) + metadata["session_id"] = session_id if session_id else "" else: if session_id and metadata["session_id"] != session_id: raise ValueError( @@ -1393,6 +1390,7 @@ class AsyncUserProfilingMemory( session_id=session_id, action_message_id=action_message_id, ) + logger.info(f"record_chat results: {results}") return results async def _parallel_add_to_pools( @@ -1757,6 +1755,7 @@ class AsyncUserProfilingMemory( ): """Optimized record_chat using parallel processing""" preference_type = preference_message.get("type") + logger.info(f"record_chat preference_type: {preference_type}") if preference_type == "irrelevant": return { "message": ( diff --git a/alias/src/alias/server/clients/memory_client.py b/alias/src/alias/server/clients/memory_client.py index d0e6b57..75ad475 100644 --- a/alias/src/alias/server/clients/memory_client.py +++ b/alias/src/alias/server/clients/memory_client.py @@ -3,6 +3,7 @@ from http import HTTPStatus from typing import Optional from loguru import logger +import httpx from alias.server.core.config import settings from alias.server.exceptions.base import ServiceError from alias.server.exceptions.service import MemoryServiceError @@ -13,6 +14,68 @@ from .base_client import BaseClient class MemoryClient(BaseClient): base_url: Optional[str] = settings.USER_PROFILING_BASE_URL + @classmethod + async def is_available(cls) -> bool: + """ + Check if memory service is available and healthy. + + Returns: + True if memory service is configured and can be reached, + False otherwise + """ + if settings.USER_PROFILING_BASE_URL is None: + return False + + # Check if the service is actually reachable by pinging the health + # endpoint + try: + health_url = ( + f"{settings.USER_PROFILING_BASE_URL.rstrip('/')}/health" + ) + async with httpx.AsyncClient(timeout=5.0) as client: + response = await client.get(health_url) + if response.status_code == HTTPStatus.OK: + logger.debug( + f"Memory service health check passed: {health_url}", + ) + return True + else: + logger.warning( + f"Memory service health check failed with status " + f"{response.status_code}: {health_url}", + ) + return False + except httpx.TimeoutException: + logger.warning( + f"Memory service health check timeout: " + f"{settings.USER_PROFILING_BASE_URL}", + ) + return False + except httpx.RequestError as e: + logger.warning( + f"Memory service health check failed: {e}", + ) + return False + except Exception as e: + logger.warning( + f"Unexpected error during memory service health check: {e}", + ) + return False + + @classmethod + def is_configured(cls) -> bool: + """ + Check if memory service is configured (synchronous, no network call). + + Note: This only checks if the service URL is configured, it does NOT + verify that the service is actually available or healthy. For actual + health checks, use the async is_available() method instead. + + Returns: + True if memory service URL is configured, False otherwise + """ + return settings.USER_PROFILING_BASE_URL is not None + async def record_action( self, action: "Action", # noqa: F821 @@ -26,7 +89,7 @@ class MemoryClient(BaseClient): try: response = await self._request( method="POST", - path="user_profiling/record_action", + path="alias_memory_service/record_action", headers=headers, data=action, ) @@ -46,3 +109,179 @@ class MemoryClient(BaseClient): except Exception as e: logger.error(e) raise MemoryServiceError(message=str(e)) from e + + async def retrieve_user_profiling( + self, + uid: str, + query: str, + limit: int = 3, + threshold: float = 0.3, + ) -> Optional[str]: + """ + Retrieve user profiling information based on query. + Only items with is_confirmed == 1 will be retrieved and returned. + + Args: + uid: User ID + query: Query string to search for relevant profiling + + Returns: + String containing retrieved profiling data (only confirmed items), + or None if service is unavailable or no confirmed items found + """ + if self.base_url is None: + return None + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + try: + response = await self._request( + method="POST", + path="alias_memory_service/user_profiling/retrieve", + headers=headers, + data={ + "uid": uid, + "query": query, + "limit": limit, + "threshold": threshold, + }, + ) + if response.status_code == HTTPStatus.OK: + result = response.json() + profiling_result_tmp = ( + result.get("data").get("profiling").get("results") + ) + + profiling_result = None + if profiling_result_tmp and len(profiling_result_tmp) > 0: + profiling_result = "\n".join( + [ + item["memory"] + for item in profiling_result_tmp + if item.get("metadata", {}).get("is_confirmed") + == 1 + ], + ) + if profiling_result: # Only log if there's actual content + logger.debug(f"Profiling result: {profiling_result}") + else: + profiling_result = None + + return profiling_result + else: + logger.warning( + f"Memory Service retrieve error: {response.status_code} - " + f"{response.text}", + ) + return None + except ServiceError as e: + logger.warning(f"Memory Service retrieve error: {e}") + return None + except Exception as e: + logger.warning(f"Unexpected error retrieving profiling: {e}") + return None + + async def retrieve_tool_memory( + self, + uid: str, + query: str, + ) -> Optional[str]: + """ + Retrieve tool memory information based on query. + + Args: + uid: User ID + query: Query string to search for relevant tool memory + (e.g., "web_search,write_file" for specific tools) + + Returns: + String containing retrieved tool memory answer, or None if + service is unavailable + + Example: + retrieve_tool_memory(uid="user123", query="web_search,write_file") + """ + if self.base_url is None: + return None + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + try: + response = await self._request( + method="POST", + path="alias_memory_service/tool_memory/retrieve", + headers=headers, + data={"uid": uid, "query": query}, + ) + if response.status_code == HTTPStatus.OK: + result = response.json() + return result.get("data") + else: + logger.warning( + f"Memory Service retrieve tool memory error: " + f"{response.status_code} - {response.text}", + ) + return None + except ServiceError as e: + logger.warning(f"Memory Service retrieve tool memory error: {e}") + return None + except Exception as e: + logger.warning( + f"Unexpected error retrieving tool memory: {e}", + ) + return None + + async def add_to_longterm_memory( + self, + uid: str, + content: list, + session_id: Optional[str] = None, + ) -> Optional[str]: + """ + Add content to user profiling. + + Args: + uid: User ID + content: Content to add to user profiling + session_id: Session ID + + Returns: + String containing the result of the add operation, or None if + service is unavailable + """ + if self.base_url is None: + return None + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + try: + response = await self._request( + method="POST", + path="alias_memory_service/user_profiling/add", + headers=headers, + data={ + "uid": uid, + "content": content, + "session_id": session_id, + }, + ) + if response.status_code == HTTPStatus.OK: + result = response.json() + return result.get("data") + else: + logger.warning( + f"Memory Service add to user profiling error: " + f"{response.status_code} - {response.text}", + ) + return None + except ServiceError as e: + logger.warning(f"Memory Service add to user profiling error: {e}") + return None + except Exception as e: + logger.warning( + f"Unexpected error adding to user profiling: {e}", + ) + return None diff --git a/alias/src/alias/server/main.py b/alias/src/alias/server/main.py index 34e1331..fdfc924 100644 --- a/alias/src/alias/server/main.py +++ b/alias/src/alias/server/main.py @@ -25,6 +25,8 @@ from alias.server.middleware.request_context_middleware import ( ) from alias.server.core.task_manager import task_manager +from alias.server.utils.logger import setup_logger + def custom_generate_unique_id(route: APIRoute) -> str: return f"{route.tags[0]}-{route.name}" @@ -35,6 +37,7 @@ async def lifespan(_app: FastAPI): """Application lifespan manager.""" # Startup print("🚀 Starting Alias API Server...") + setup_logger() await initialize_database() await task_manager.start() await redis_client.ping() diff --git a/alias/src/alias/server/schemas/action.py b/alias/src/alias/server/schemas/action.py index 66ba14c..8bf87c8 100644 --- a/alias/src/alias/server/schemas/action.py +++ b/alias/src/alias/server/schemas/action.py @@ -27,6 +27,7 @@ class OperationRecord(SQLModel): class QueryRecord(SQLModel): query: Optional[str] = None + session_content: Optional[list[Any]] = None class Action(SQLModel): @@ -159,6 +160,7 @@ class ChatAction(Action): query: Optional[str] = None, chat_type: Optional[ChatType] = None, history_length: int = 0, + session_content: Optional[list[dict]] = None, ): action_type = cls._resolve_action_type(chat_type, history_length) return cls( @@ -168,6 +170,7 @@ class ChatAction(Action): message_id=message_id, data=QueryRecord( query=query, + session_content=session_content, ), ) @@ -185,6 +188,7 @@ class TaskStopAction(Action): user_id: uuid.UUID, conversation_id: uuid.UUID, task_id: uuid.UUID, + data: Optional[dict] = None, ): action_type = cls._resolve_action_type() return cls( @@ -192,4 +196,5 @@ class TaskStopAction(Action): session_id=conversation_id, action_type=action_type, task_id=task_id, + data=data, ) diff --git a/alias/src/alias/server/schemas/chat.py b/alias/src/alias/server/schemas/chat.py index 274be25..ae19dae 100644 --- a/alias/src/alias/server/schemas/chat.py +++ b/alias/src/alias/server/schemas/chat.py @@ -42,6 +42,7 @@ class ChatRequest(SQLModel): language_type: Optional[LanguageType] = LanguageType.EN_US chat_mode: Optional[ChatMode] = ChatMode.GENERAL roadmap: Optional[RoadmapChange] = None + use_long_term_memory_service: Optional[bool] = False class ContinueChatRequest(SQLModel): diff --git a/alias/src/alias/server/schemas/session_entity.py b/alias/src/alias/server/schemas/session_entity.py index 7181ebb..81c11c1 100644 --- a/alias/src/alias/server/schemas/session_entity.py +++ b/alias/src/alias/server/schemas/session_entity.py @@ -16,6 +16,7 @@ class SessionEntity(SQLModel): chat_type: Optional[ChatType] = ChatType.TASK query: Optional[str] = None roadmap: Optional[RoadmapChange] = None + use_long_term_memory_service: Optional[bool] = False def ids(self): return { diff --git a/alias/src/alias/server/services/chat_service.py b/alias/src/alias/server/services/chat_service.py index 86ed86e..c0cac7b 100644 --- a/alias/src/alias/server/services/chat_service.py +++ b/alias/src/alias/server/services/chat_service.py @@ -131,6 +131,9 @@ class ChatService: chat_mode = chat_request.chat_mode language_type = chat_request.language_type roadmap = chat_request.roadmap + use_long_term_memory_service = ( + chat_request.use_long_term_memory_service or False + ) async with session_scope() as session: message_service = MessageService(session=session) @@ -178,6 +181,7 @@ class ChatService: chat_type=chat_type, query=query, roadmap=roadmap, + use_long_term_memory_service=use_long_term_memory_service, ) event_manager = EventManager( diff --git a/alias/src/alias/server/utils/logger.py b/alias/src/alias/server/utils/logger.py index 77a01b1..0bf529a 100644 --- a/alias/src/alias/server/utils/logger.py +++ b/alias/src/alias/server/utils/logger.py @@ -74,10 +74,7 @@ def setup_logger(): rotation=settings.LOG_ROTATION, retention=log_retention, enqueue=True, + mode="a", ) except Exception as e: logger.error(f"Logger setup failed: {e}", exc_info=True) - - -# Configure logger when the module is imported -setup_logger()