- Remove Docker-based microservices (docker-compose.yml, Makefile, Dockerfiles) - Update start-dev.sh to use backend.app:app entry point - Add shared schema and client modules for service communication - Add team coordination modules (messenger, registry, task_delegator, coordinator) - Add evaluation hooks and skill adaptation hooks - Add skill template and gateway server - Update frontend WebSocket URL configuration - Add explain components for insider and technical analysis Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
226 lines
7.3 KiB
Python
226 lines
7.3 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""AgentMessenger - Pub/sub inter-agent communication.
|
|
|
|
Provides broadcast(), send(), and subscribe() for message passing
|
|
between agents using AgentScope's Msg format.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from typing import Any, Callable, Dict, List, Optional, Set
|
|
|
|
from agentscope.message import Msg
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AgentMessenger:
|
|
"""Pub/sub messenger for inter-agent communication.
|
|
|
|
Supports:
|
|
- broadcast(): Send message to all subscribers
|
|
- send(): Send message to specific agent
|
|
- subscribe(): Register callback for agent messages
|
|
- announce(): Send system-wide announcement
|
|
- enable_auto_broadcast: Auto-broadcast agent replies to all participants
|
|
|
|
Messages use AgentScope's Msg format for compatibility.
|
|
"""
|
|
|
|
def __init__(self, enable_auto_broadcast: bool = False):
|
|
"""Initialize the messenger.
|
|
|
|
Args:
|
|
enable_auto_broadcast: If True, agent replies are automatically
|
|
broadcast to all subscribed agents.
|
|
"""
|
|
self._subscriptions: Dict[str, List[Callable[[Msg], None]]] = {}
|
|
self._inbox: Dict[str, List[Msg]] = {}
|
|
self._locks: Dict[str, asyncio.Lock] = {}
|
|
self._enable_auto_broadcast = enable_auto_broadcast
|
|
self._participants: Set[str] = set()
|
|
|
|
def subscribe(
|
|
self,
|
|
agent_id: str,
|
|
callback: Callable[[Msg], None],
|
|
) -> None:
|
|
"""Subscribe an agent to receive messages.
|
|
|
|
Args:
|
|
agent_id: Target agent identifier
|
|
callback: Async function to call when message received
|
|
"""
|
|
if agent_id not in self._subscriptions:
|
|
self._subscriptions[agent_id] = []
|
|
self._subscriptions[agent_id].append(callback)
|
|
logger.debug("Agent %s subscribed to messages", agent_id)
|
|
|
|
def unsubscribe(self, agent_id: str, callback: Callable[[Msg], None]) -> None:
|
|
"""Unsubscribe an agent from messages.
|
|
|
|
Args:
|
|
agent_id: Target agent identifier
|
|
callback: Callback to remove
|
|
"""
|
|
if agent_id in self._subscriptions:
|
|
try:
|
|
self._subscriptions[agent_id].remove(callback)
|
|
logger.debug("Agent %s unsubscribed from messages", agent_id)
|
|
except ValueError:
|
|
pass
|
|
|
|
async def send(
|
|
self,
|
|
to_agent: str,
|
|
message: Msg,
|
|
) -> None:
|
|
"""Send message to specific agent.
|
|
|
|
Args:
|
|
to_agent: Target agent identifier
|
|
message: Message to send (uses Msg format)
|
|
"""
|
|
async def _deliver():
|
|
if to_agent in self._subscriptions:
|
|
for callback in self._subscriptions[to_agent]:
|
|
try:
|
|
if asyncio.iscoroutinefunction(callback):
|
|
await callback(message)
|
|
else:
|
|
callback(message)
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error delivering message to %s: %s",
|
|
to_agent,
|
|
e,
|
|
)
|
|
|
|
await _deliver()
|
|
|
|
async def broadcast(self, message: Msg) -> None:
|
|
"""Broadcast message to all subscribed agents.
|
|
|
|
Args:
|
|
message: Message to broadcast (uses Msg format)
|
|
"""
|
|
delivery_tasks = []
|
|
for agent_id, callbacks in self._subscriptions.items():
|
|
for callback in callbacks:
|
|
async def _deliver(cb=callback, aid=agent_id):
|
|
try:
|
|
if asyncio.iscoroutinefunction(cb):
|
|
await cb(message)
|
|
else:
|
|
cb(message)
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error broadcasting to %s: %s",
|
|
aid,
|
|
e,
|
|
)
|
|
delivery_tasks.append(_deliver())
|
|
|
|
if delivery_tasks:
|
|
await asyncio.gather(*delivery_tasks)
|
|
|
|
def inbox(self, agent_id: str) -> List[Msg]:
|
|
"""Get and clear inbox for agent.
|
|
|
|
Args:
|
|
agent_id: Agent identifier
|
|
|
|
Returns:
|
|
List of messages in inbox
|
|
"""
|
|
messages = self._inbox.get(agent_id, [])
|
|
self._inbox[agent_id] = []
|
|
return messages
|
|
|
|
def inbox_count(self, agent_id: str) -> int:
|
|
"""Count messages in agent's inbox without clearing.
|
|
|
|
Args:
|
|
agent_id: Agent identifier
|
|
|
|
Returns:
|
|
Number of messages waiting
|
|
"""
|
|
return len(self._inbox.get(agent_id, []))
|
|
|
|
def add_participant(self, agent_id: str) -> None:
|
|
"""Add a participant to the messenger.
|
|
|
|
Participants are the agents that can receive auto-broadcast messages.
|
|
|
|
Args:
|
|
agent_id: Agent identifier to add
|
|
"""
|
|
self._participants.add(agent_id)
|
|
logger.debug("Agent %s added as participant", agent_id)
|
|
|
|
def remove_participant(self, agent_id: str) -> None:
|
|
"""Remove a participant from the messenger.
|
|
|
|
Args:
|
|
agent_id: Agent identifier to remove
|
|
"""
|
|
self._participants.discard(agent_id)
|
|
logger.debug("Agent %s removed from participants", agent_id)
|
|
|
|
@property
|
|
def enable_auto_broadcast(self) -> bool:
|
|
"""Check if auto_broadcast is enabled."""
|
|
return self._enable_auto_broadcast
|
|
|
|
@enable_auto_broadcast.setter
|
|
def enable_auto_broadcast(self, value: bool) -> None:
|
|
"""Enable or disable auto_broadcast."""
|
|
self._enable_auto_broadcast = value
|
|
logger.debug("Auto_broadcast set to %s", value)
|
|
|
|
async def announce(self, message: Msg) -> None:
|
|
"""Send a system-wide announcement to all participants.
|
|
|
|
Unlike broadcast(), announce() sends a message from the system/host
|
|
to all participants without requiring prior subscription.
|
|
|
|
Args:
|
|
message: Announcement message (uses Msg format)
|
|
"""
|
|
logger.info("System announcement: %s", message.content)
|
|
await self.broadcast(message)
|
|
|
|
async def auto_broadcast(self, message: Msg) -> None:
|
|
"""Auto-broadcast message to all participants.
|
|
|
|
This is called internally when enable_auto_broadcast is True.
|
|
Broadcasts to all registered participants.
|
|
|
|
Args:
|
|
message: Message to auto-broadcast (uses Msg format)
|
|
"""
|
|
if not self._enable_auto_broadcast:
|
|
return
|
|
|
|
# Broadcast to all participants
|
|
for participant_id in self._participants:
|
|
if participant_id in self._subscriptions:
|
|
for callback in self._subscriptions[participant_id]:
|
|
try:
|
|
if asyncio.iscoroutinefunction(callback):
|
|
await callback(message)
|
|
else:
|
|
callback(message)
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error auto-broadcasting to %s: %s",
|
|
participant_id,
|
|
e,
|
|
)
|
|
|
|
|
|
__all__ = ["AgentMessenger"]
|