- Remove Docker-based microservices (docker-compose.yml, Makefile, Dockerfiles) - Update start-dev.sh to use backend.app:app entry point - Add shared schema and client modules for service communication - Add team coordination modules (messenger, registry, task_delegator, coordinator) - Add evaluation hooks and skill adaptation hooks - Add skill template and gateway server - Update frontend WebSocket URL configuration - Add explain components for insider and technical analysis Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
70 lines
2.3 KiB
Python
70 lines
2.3 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Tests for HeartbeatHook."""
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from backend.agents.base.hooks import HeartbeatHook
|
|
|
|
|
|
class TestHeartbeatHook:
|
|
"""Tests for HeartbeatHook._read_heartbeat_content."""
|
|
|
|
def test_read_heartbeat_content_with_content(self, tmp_path):
|
|
"""Test reading HEARTBEAT.md when it exists and has content."""
|
|
ws_dir = tmp_path / "analyst_workspace"
|
|
ws_dir.mkdir()
|
|
hb_file = ws_dir / "HEARTBEAT.md"
|
|
hb_file.write_text("# 定期主动检查\n\n- [ ] 持仓是否健康\n", encoding="utf-8")
|
|
|
|
hook = HeartbeatHook(workspace_dir=ws_dir)
|
|
content = hook._read_heartbeat_content()
|
|
|
|
assert content is not None
|
|
assert "# 定期主动检查" in content
|
|
assert "持仓是否健康" in content
|
|
|
|
def test_read_heartbeat_content_absent(self, tmp_path):
|
|
"""Test reading when HEARTBEAT.md does not exist."""
|
|
ws_dir = tmp_path / "analyst_workspace"
|
|
ws_dir.mkdir()
|
|
|
|
hook = HeartbeatHook(workspace_dir=ws_dir)
|
|
content = hook._read_heartbeat_content()
|
|
|
|
assert content is None
|
|
|
|
def test_read_heartbeat_content_empty(self, tmp_path):
|
|
"""Test reading when HEARTBEAT.md is empty."""
|
|
ws_dir = tmp_path / "analyst_workspace"
|
|
ws_dir.mkdir()
|
|
hb_file = ws_dir / "HEARTBEAT.md"
|
|
hb_file.write_text("", encoding="utf-8")
|
|
|
|
hook = HeartbeatHook(workspace_dir=ws_dir)
|
|
content = hook._read_heartbeat_content()
|
|
|
|
assert content is None
|
|
|
|
def test_read_heartbeat_content_whitespace_only(self, tmp_path):
|
|
"""Test reading when HEARTBEAT.md contains only whitespace."""
|
|
ws_dir = tmp_path / "analyst_workspace"
|
|
ws_dir.mkdir()
|
|
hb_file = ws_dir / "HEARTBEAT.md"
|
|
hb_file.write_text(" \n\n ", encoding="utf-8")
|
|
|
|
hook = HeartbeatHook(workspace_dir=ws_dir)
|
|
content = hook._read_heartbeat_content()
|
|
|
|
assert content is None
|
|
|
|
def test_completed_flag_path(self, tmp_path):
|
|
"""Test that completion flag is placed in workspace directory."""
|
|
ws_dir = tmp_path / "analyst_workspace"
|
|
ws_dir.mkdir()
|
|
|
|
hook = HeartbeatHook(workspace_dir=ws_dir)
|
|
|
|
assert hook._completed_flag == ws_dir / ".heartbeat_completed"
|