800 lines
30 KiB
Python
800 lines
30 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
WebSocket Gateway for frontend communication
|
|
"""
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Callable, Dict, List, Optional, Set
|
|
|
|
import websockets
|
|
from websockets.asyncio.server import ServerConnection
|
|
|
|
from backend.data.provider_utils import normalize_symbol
|
|
from backend.domains import news as news_domain
|
|
from backend.llm.models import get_agent_model_info
|
|
from backend.utils.terminal_dashboard import get_dashboard
|
|
from backend.core.pipeline import TradingPipeline
|
|
from backend.core.state_sync import StateSync
|
|
from backend.services.market import MarketService
|
|
from backend.services.storage import StorageService
|
|
from backend.data.provider_router import get_provider_router
|
|
from backend.tools.technical_signals import StockTechnicalAnalyzer
|
|
from backend.core.scheduler import Scheduler
|
|
from backend.services import gateway_admin_handlers
|
|
from backend.services import gateway_cycle_support
|
|
from backend.services import gateway_runtime_support
|
|
from backend.services import gateway_stock_handlers
|
|
from shared.client import NewsServiceClient
|
|
from shared.client import TradingServiceClient
|
|
|
|
logger = logging.getLogger(__name__)
|
|
EDITABLE_AGENT_WORKSPACE_FILES = {
|
|
"SOUL.md",
|
|
"PROFILE.md",
|
|
"AGENTS.md",
|
|
"MEMORY.md",
|
|
"POLICY.md",
|
|
"HEARTBEAT.md",
|
|
"ROLE.md",
|
|
"STYLE.md",
|
|
}
|
|
|
|
|
|
class Gateway:
|
|
"""WebSocket Gateway for frontend communication"""
|
|
|
|
def __init__(
|
|
self,
|
|
market_service: MarketService,
|
|
storage_service: StorageService,
|
|
pipeline: TradingPipeline,
|
|
state_sync: Optional[StateSync] = None,
|
|
scheduler_callback: Optional[Callable] = None,
|
|
scheduler: Optional[Scheduler] = None,
|
|
config: Dict[str, Any] = None,
|
|
):
|
|
self.market_service = market_service
|
|
self.storage = storage_service
|
|
self.pipeline = pipeline
|
|
self.scheduler_callback = scheduler_callback
|
|
self.scheduler = scheduler
|
|
self.config = config or {}
|
|
|
|
self.mode = self.config.get("mode", "live")
|
|
self.is_backtest = self.mode == "backtest" or self.config.get(
|
|
"backtest_mode",
|
|
False,
|
|
)
|
|
|
|
self.state_sync = state_sync or StateSync(storage=storage_service)
|
|
# self.state_sync.set_mode(self.is_backtest)
|
|
self.state_sync.set_broadcast_fn(self.broadcast)
|
|
self.pipeline.state_sync = self.state_sync
|
|
|
|
self.connected_clients: Set[ServerConnection] = set()
|
|
self.lock = asyncio.Lock()
|
|
self._cycle_lock = asyncio.Lock()
|
|
self._backtest_task: Optional[asyncio.Task] = None
|
|
self._manual_cycle_task: Optional[asyncio.Task] = None
|
|
self._backtest_start_date: Optional[str] = None
|
|
self._backtest_end_date: Optional[str] = None
|
|
self._dashboard = get_dashboard()
|
|
self._market_status_task: Optional[asyncio.Task] = None
|
|
self._watchlist_ingest_task: Optional[asyncio.Task] = None
|
|
|
|
# Session tracking for live returns
|
|
self._session_start_portfolio_value: Optional[float] = None
|
|
self._provider_router = get_provider_router()
|
|
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
|
self._project_root = Path(__file__).resolve().parents[2]
|
|
self._technical_analyzer = StockTechnicalAnalyzer()
|
|
|
|
async def start(self, host: str = "0.0.0.0", port: int = 8766):
|
|
"""Start gateway server with proper initialization order.
|
|
|
|
Phase 1: Start WebSocket server first so frontend can connect immediately
|
|
Phase 2: Start market data service (pushes data to connected clients)
|
|
Phase 3: Start scheduler last (triggers trading cycles)
|
|
"""
|
|
logger.info(f"Starting gateway on {host}:{port}")
|
|
self._loop = asyncio.get_running_loop()
|
|
self._provider_router.add_listener(self._on_provider_usage_changed)
|
|
|
|
# Initialize terminal dashboard
|
|
self._dashboard.set_config(
|
|
mode=self.mode,
|
|
config_name=self.config.get("config_name", "default"),
|
|
host=host,
|
|
port=port,
|
|
poll_interval=self.config.get("poll_interval", 10),
|
|
mock=self.config.get("mock_mode", False),
|
|
tickers=self.config.get("tickers", []),
|
|
initial_cash=self.storage.initial_cash,
|
|
start_date=self._backtest_start_date or "",
|
|
end_date=self._backtest_end_date or "",
|
|
data_sources=self._provider_router.get_usage_snapshot(),
|
|
)
|
|
self._dashboard.start()
|
|
|
|
self.state_sync.load_state()
|
|
self.market_service.set_price_recorder(self.storage.record_price_point)
|
|
self.state_sync.update_state("status", "initializing")
|
|
self.state_sync.update_state("server_mode", self.mode)
|
|
self.state_sync.update_state("is_backtest", self.is_backtest)
|
|
self.state_sync.update_state(
|
|
"is_mock_mode",
|
|
self.config.get("mock_mode", False),
|
|
)
|
|
self.state_sync.update_state("tickers", self.config.get("tickers", []))
|
|
self.state_sync.update_state(
|
|
"runtime_config",
|
|
{
|
|
"tickers": self.config.get("tickers", []),
|
|
"schedule_mode": self.config.get("schedule_mode", "daily"),
|
|
"interval_minutes": self.config.get("interval_minutes", 60),
|
|
"trigger_time": self.config.get("trigger_time", "09:30"),
|
|
"initial_cash": self.config.get(
|
|
"initial_cash",
|
|
self.storage.initial_cash,
|
|
),
|
|
"margin_requirement": self.config.get("margin_requirement"),
|
|
"max_comm_cycles": self.config.get("max_comm_cycles"),
|
|
"enable_memory": self.config.get("enable_memory", False),
|
|
},
|
|
)
|
|
self.state_sync.update_state(
|
|
"data_sources",
|
|
self._provider_router.get_usage_snapshot(),
|
|
)
|
|
|
|
# Load and display existing portfolio state if available
|
|
dashboard_snapshot = self.storage.build_dashboard_snapshot_from_state(self.state_sync.state)
|
|
summary = dashboard_snapshot.get("summary")
|
|
if summary:
|
|
holdings = dashboard_snapshot.get("holdings") or []
|
|
trades = dashboard_snapshot.get("trades") or []
|
|
current_date = self.state_sync.state.get("current_date")
|
|
self._dashboard.update(
|
|
date=current_date or "-",
|
|
status="running",
|
|
portfolio=summary,
|
|
holdings=holdings,
|
|
trades=trades,
|
|
)
|
|
logger.info(
|
|
"Loaded existing portfolio: $%s",
|
|
f"{summary.get('totalAssetValue', 0):,.2f}",
|
|
)
|
|
|
|
# ======================================================================
|
|
# PHASE 1: Start WebSocket server first
|
|
# This allows frontend to connect immediately and receive status updates
|
|
# ======================================================================
|
|
logger.info("[Phase 1/4] Starting WebSocket server...")
|
|
self.state_sync.update_state("status", "websocket_ready")
|
|
|
|
# Create server but don't block yet - we'll serve inside the context manager
|
|
server = await websockets.serve(
|
|
self.handle_client,
|
|
host,
|
|
port,
|
|
ping_interval=30,
|
|
ping_timeout=60,
|
|
)
|
|
logger.info(f"WebSocket server ready: ws://{host}:{port}")
|
|
|
|
# Give a brief moment for any existing clients to reconnect
|
|
await asyncio.sleep(0.1)
|
|
|
|
# ======================================================================
|
|
# PHASE 2: Start market data service
|
|
# Now frontend is connected, start pushing price updates
|
|
# ======================================================================
|
|
logger.info("[Phase 2/4] Starting market data service...")
|
|
self.state_sync.update_state("status", "market_service_starting")
|
|
await self.market_service.start(broadcast_func=self.broadcast)
|
|
self.state_sync.update_state("status", "market_service_ready")
|
|
logger.info("Market data service ready - price updates active")
|
|
|
|
# ======================================================================
|
|
# PHASE 3: Start market status monitoring
|
|
# Monitors market open/close and broadcasts status
|
|
# ======================================================================
|
|
logger.info("[Phase 3/4] Starting market status monitoring...")
|
|
if not self.is_backtest:
|
|
self._market_status_task = asyncio.create_task(
|
|
self._market_status_monitor(),
|
|
)
|
|
|
|
# ======================================================================
|
|
# PHASE 4: Start scheduler last
|
|
# Only start trading after everything else is ready
|
|
# ======================================================================
|
|
logger.info("[Phase 4/4] Starting scheduler...")
|
|
self.state_sync.update_state("status", "scheduler_starting")
|
|
|
|
if self.scheduler:
|
|
# Wire up heartbeat callback if heartbeat is configured
|
|
heartbeat_interval = self.config.get("heartbeat_interval", 0)
|
|
if heartbeat_interval and heartbeat_interval > 0:
|
|
self.scheduler.set_heartbeat_callback(self.on_heartbeat_trigger)
|
|
logger.info(
|
|
f"[Heartbeat] Registered heartbeat callback (interval={heartbeat_interval}s)",
|
|
)
|
|
await self.scheduler.start(self.on_strategy_trigger)
|
|
elif self.scheduler_callback:
|
|
await self.scheduler_callback(callback=self.on_strategy_trigger)
|
|
|
|
self.state_sync.update_state("status", "running")
|
|
logger.info(
|
|
f"Gateway fully operational: ws://{host}:{port}, mode={self.mode}",
|
|
)
|
|
|
|
# Keep server running
|
|
await asyncio.Future()
|
|
|
|
def _on_provider_usage_changed(self, snapshot: Dict[str, Any]):
|
|
"""Handle provider routing updates from the shared router."""
|
|
self.state_sync.update_state("data_sources", snapshot)
|
|
self._dashboard.update(data_sources=snapshot)
|
|
if self._loop and self._loop.is_running():
|
|
asyncio.run_coroutine_threadsafe(
|
|
self.broadcast(
|
|
{
|
|
"type": "data_sources_update",
|
|
"data_sources": snapshot,
|
|
},
|
|
),
|
|
self._loop,
|
|
)
|
|
|
|
@property
|
|
def state(self) -> Dict[str, Any]:
|
|
return self.state_sync.state
|
|
|
|
@staticmethod
|
|
def _news_rows_need_enrichment(rows: List[Dict[str, Any]]) -> bool:
|
|
return news_domain.news_rows_need_enrichment(rows)
|
|
|
|
def _news_service_url(self) -> str | None:
|
|
"""Return configured news-service base URL, if any."""
|
|
candidate = self.config.get("news_service_url") or os.getenv(
|
|
"NEWS_SERVICE_URL",
|
|
"",
|
|
)
|
|
value = str(candidate or "").strip()
|
|
return value or None
|
|
|
|
def _trading_service_url(self) -> str | None:
|
|
"""Return configured trading-service base URL, if any."""
|
|
candidate = self.config.get("trading_service_url") or os.getenv(
|
|
"TRADING_SERVICE_URL",
|
|
"",
|
|
)
|
|
value = str(candidate or "").strip()
|
|
return value or None
|
|
|
|
async def _call_news_service(
|
|
self,
|
|
action: str,
|
|
callback: Callable[[NewsServiceClient], Any],
|
|
) -> Any | None:
|
|
"""Call news-service when configured, otherwise return None."""
|
|
service_url = self._news_service_url()
|
|
if not service_url:
|
|
return None
|
|
|
|
try:
|
|
async with NewsServiceClient(service_url) as client:
|
|
return await callback(client)
|
|
except Exception as exc:
|
|
logger.warning("news-service %s failed: %s", action, exc)
|
|
return None
|
|
|
|
async def _call_trading_service(
|
|
self,
|
|
action: str,
|
|
callback: Callable[[TradingServiceClient], Any],
|
|
) -> Any | None:
|
|
"""Call trading-service when configured, otherwise return None."""
|
|
service_url = self._trading_service_url()
|
|
if not service_url:
|
|
return None
|
|
|
|
try:
|
|
async with TradingServiceClient(service_url) as client:
|
|
return await callback(client)
|
|
except Exception as exc:
|
|
logger.warning("trading-service %s failed: %s", action, exc)
|
|
return None
|
|
|
|
async def handle_client(self, websocket: ServerConnection):
|
|
"""Handle WebSocket client connection"""
|
|
async with self.lock:
|
|
self.connected_clients.add(websocket)
|
|
|
|
await self._send_initial_state(websocket)
|
|
await self._handle_client_messages(websocket)
|
|
|
|
async with self.lock:
|
|
self.connected_clients.discard(websocket)
|
|
|
|
async def _send_initial_state(self, websocket: ServerConnection):
|
|
try:
|
|
logger.info("[Gateway] Sending initial state to client...")
|
|
state_payload = self.state_sync.get_initial_state_payload(
|
|
include_dashboard=True,
|
|
)
|
|
state_payload["data_sources"] = (
|
|
self._provider_router.get_usage_snapshot()
|
|
)
|
|
# Include market status in initial state
|
|
state_payload[
|
|
"market_status"
|
|
] = self.market_service.get_market_status()
|
|
|
|
# Include live returns if session is active
|
|
if self.storage.is_live_session_active:
|
|
live_returns = self.storage.get_live_returns()
|
|
if "portfolio" in state_payload:
|
|
state_payload["portfolio"].update(live_returns)
|
|
|
|
await websocket.send(
|
|
json.dumps(
|
|
{"type": "initial_state", "state": state_payload},
|
|
ensure_ascii=False,
|
|
default=str,
|
|
),
|
|
)
|
|
logger.info("[Gateway] Initial state sent successfully")
|
|
except Exception as e:
|
|
logger.exception(f"[Gateway] Failed to send initial state: {e}")
|
|
# Send error response so client knows something went wrong
|
|
try:
|
|
await websocket.send(
|
|
json.dumps(
|
|
{"type": "error", "message": "Failed to load initial state"},
|
|
ensure_ascii=False,
|
|
),
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to send error response to client: {e}")
|
|
|
|
async def _handle_client_messages(
|
|
self,
|
|
websocket: ServerConnection,
|
|
):
|
|
try:
|
|
async for message in websocket:
|
|
data = json.loads(message)
|
|
msg_type = data.get("type", "unknown")
|
|
|
|
if msg_type == "ping":
|
|
await websocket.send(
|
|
json.dumps(
|
|
{
|
|
"type": "pong",
|
|
"timestamp": datetime.now().isoformat(),
|
|
},
|
|
ensure_ascii=False,
|
|
),
|
|
)
|
|
elif msg_type == "get_state":
|
|
await self._send_initial_state(websocket)
|
|
elif msg_type == "start_backtest":
|
|
await self._handle_start_backtest(data)
|
|
elif msg_type == "trigger_strategy":
|
|
await self._handle_manual_trigger(websocket, data)
|
|
elif msg_type == "update_runtime_config":
|
|
await self._handle_update_runtime_config(websocket, data)
|
|
elif msg_type == "reload_runtime_assets":
|
|
await self._handle_reload_runtime_assets()
|
|
elif msg_type == "update_watchlist":
|
|
await self._handle_update_watchlist(websocket, data)
|
|
elif msg_type == "get_agent_skills":
|
|
await self._handle_get_agent_skills(websocket, data)
|
|
elif msg_type == "get_agent_profile":
|
|
await self._handle_get_agent_profile(websocket, data)
|
|
elif msg_type == "get_skill_detail":
|
|
await self._handle_get_skill_detail(websocket, data)
|
|
elif msg_type == "create_agent_local_skill":
|
|
await self._handle_create_agent_local_skill(websocket, data)
|
|
elif msg_type == "update_agent_local_skill":
|
|
await self._handle_update_agent_local_skill(websocket, data)
|
|
elif msg_type == "delete_agent_local_skill":
|
|
await self._handle_delete_agent_local_skill(websocket, data)
|
|
elif msg_type == "remove_agent_skill":
|
|
await self._handle_remove_agent_skill(websocket, data)
|
|
elif msg_type == "update_agent_skill":
|
|
await self._handle_update_agent_skill(websocket, data)
|
|
elif msg_type == "get_agent_workspace_file":
|
|
await self._handle_get_agent_workspace_file(websocket, data)
|
|
elif msg_type == "update_agent_workspace_file":
|
|
await self._handle_update_agent_workspace_file(websocket, data)
|
|
elif msg_type == "get_stock_history":
|
|
await self._handle_get_stock_history(websocket, data)
|
|
elif msg_type == "get_stock_explain_events":
|
|
await self._handle_get_stock_explain_events(websocket, data)
|
|
elif msg_type == "get_stock_news":
|
|
await self._handle_get_stock_news(websocket, data)
|
|
elif msg_type == "get_stock_news_for_date":
|
|
await self._handle_get_stock_news_for_date(websocket, data)
|
|
elif msg_type == "get_stock_news_timeline":
|
|
await self._handle_get_stock_news_timeline(websocket, data)
|
|
elif msg_type == "get_stock_news_categories":
|
|
await self._handle_get_stock_news_categories(websocket, data)
|
|
elif msg_type == "get_stock_range_explain":
|
|
await self._handle_get_stock_range_explain(websocket, data)
|
|
elif msg_type == "get_stock_insider_trades":
|
|
await self._handle_get_stock_insider_trades(websocket, data)
|
|
elif msg_type == "get_stock_story":
|
|
await self._handle_get_stock_story(websocket, data)
|
|
elif msg_type == "get_stock_similar_days":
|
|
await self._handle_get_stock_similar_days(websocket, data)
|
|
elif msg_type == "get_stock_technical_indicators":
|
|
await self._handle_get_stock_technical_indicators(websocket, data)
|
|
elif msg_type == "run_stock_enrich":
|
|
await self._handle_run_stock_enrich(websocket, data)
|
|
|
|
except websockets.ConnectionClosed:
|
|
pass
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
async def _handle_get_stock_history(
|
|
self,
|
|
websocket: ServerConnection,
|
|
data: Dict[str, Any],
|
|
):
|
|
await gateway_stock_handlers.handle_get_stock_history(self, websocket, data)
|
|
|
|
async def _handle_get_stock_explain_events(
|
|
self,
|
|
websocket: ServerConnection,
|
|
data: Dict[str, Any],
|
|
):
|
|
await gateway_stock_handlers.handle_get_stock_explain_events(self, websocket, data)
|
|
|
|
async def _handle_get_stock_news(
|
|
self,
|
|
websocket: ServerConnection,
|
|
data: Dict[str, Any],
|
|
):
|
|
await gateway_stock_handlers.handle_get_stock_news(self, websocket, data)
|
|
|
|
async def _handle_get_stock_news_for_date(
|
|
self,
|
|
websocket: ServerConnection,
|
|
data: Dict[str, Any],
|
|
):
|
|
await gateway_stock_handlers.handle_get_stock_news_for_date(self, websocket, data)
|
|
|
|
async def _handle_get_stock_news_timeline(
|
|
self,
|
|
websocket: ServerConnection,
|
|
data: Dict[str, Any],
|
|
):
|
|
await gateway_stock_handlers.handle_get_stock_news_timeline(self, websocket, data)
|
|
|
|
async def _handle_get_stock_news_categories(
|
|
self,
|
|
websocket: ServerConnection,
|
|
data: Dict[str, Any],
|
|
):
|
|
await gateway_stock_handlers.handle_get_stock_news_categories(self, websocket, data)
|
|
|
|
async def _handle_get_stock_range_explain(
|
|
self,
|
|
websocket: ServerConnection,
|
|
data: Dict[str, Any],
|
|
):
|
|
await gateway_stock_handlers.handle_get_stock_range_explain(self, websocket, data)
|
|
|
|
async def _handle_get_stock_insider_trades(
|
|
self,
|
|
websocket: ServerConnection,
|
|
data: Dict[str, Any],
|
|
):
|
|
await gateway_stock_handlers.handle_get_stock_insider_trades(self, websocket, data)
|
|
|
|
async def _handle_get_stock_story(
|
|
self,
|
|
websocket: ServerConnection,
|
|
data: Dict[str, Any],
|
|
):
|
|
await gateway_stock_handlers.handle_get_stock_story(self, websocket, data)
|
|
|
|
async def _handle_get_stock_similar_days(
|
|
self,
|
|
websocket: ServerConnection,
|
|
data: Dict[str, Any],
|
|
):
|
|
await gateway_stock_handlers.handle_get_stock_similar_days(self, websocket, data)
|
|
|
|
async def _handle_get_stock_technical_indicators(
|
|
self,
|
|
websocket: ServerConnection,
|
|
data: Dict[str, Any],
|
|
):
|
|
await gateway_stock_handlers.handle_get_stock_technical_indicators(self, websocket, data)
|
|
|
|
async def _handle_run_stock_enrich(
|
|
self,
|
|
websocket: ServerConnection,
|
|
data: Dict[str, Any],
|
|
):
|
|
await gateway_stock_handlers.handle_run_stock_enrich(self, websocket, data)
|
|
|
|
async def _handle_start_backtest(self, data: Dict[str, Any]):
|
|
if not self.is_backtest:
|
|
return
|
|
dates = data.get("dates", [])
|
|
if dates and self._backtest_task is None:
|
|
task = asyncio.create_task(
|
|
self._run_backtest_dates(dates),
|
|
)
|
|
task.add_done_callback(self._handle_backtest_exception)
|
|
self._backtest_task = task
|
|
|
|
async def _handle_manual_trigger(
|
|
self,
|
|
websocket: ServerConnection,
|
|
data: Dict[str, Any],
|
|
) -> None:
|
|
"""Run one live/mock trading cycle on demand."""
|
|
if self.is_backtest:
|
|
await websocket.send(
|
|
json.dumps(
|
|
{
|
|
"type": "error",
|
|
"message": "Manual trigger is only available in live/mock mode.",
|
|
},
|
|
ensure_ascii=False,
|
|
),
|
|
)
|
|
return
|
|
|
|
if (
|
|
self._cycle_lock.locked()
|
|
or (
|
|
self._manual_cycle_task is not None
|
|
and not self._manual_cycle_task.done()
|
|
)
|
|
):
|
|
await websocket.send(
|
|
json.dumps(
|
|
{
|
|
"type": "error",
|
|
"message": "A trading cycle is already running.",
|
|
},
|
|
ensure_ascii=False,
|
|
),
|
|
)
|
|
await self.state_sync.on_system_message("已有任务在运行,已忽略手动触发")
|
|
return
|
|
|
|
requested_date = data.get("date")
|
|
await self.state_sync.on_system_message("收到手动触发请求,准备开始新一轮分析与决策")
|
|
task = asyncio.create_task(
|
|
self.on_strategy_trigger(
|
|
date=requested_date or datetime.now().strftime("%Y-%m-%d"),
|
|
),
|
|
)
|
|
task.add_done_callback(self._handle_manual_cycle_exception)
|
|
self._manual_cycle_task = task
|
|
|
|
async def _handle_reload_runtime_assets(self):
|
|
await gateway_admin_handlers.handle_reload_runtime_assets(self)
|
|
|
|
async def _handle_update_runtime_config(
|
|
self,
|
|
websocket: ServerConnection,
|
|
data: Dict[str, Any],
|
|
) -> None:
|
|
await gateway_admin_handlers.handle_update_runtime_config(self, websocket, data)
|
|
|
|
async def _handle_update_watchlist(
|
|
self,
|
|
websocket: ServerConnection,
|
|
data: Dict[str, Any],
|
|
) -> None:
|
|
await gateway_admin_handlers.handle_update_watchlist(self, websocket, data)
|
|
|
|
async def _handle_get_agent_skills(
|
|
self,
|
|
websocket: ServerConnection,
|
|
data: Dict[str, Any],
|
|
) -> None:
|
|
await gateway_admin_handlers.handle_get_agent_skills(self, websocket, data)
|
|
|
|
async def _handle_get_agent_profile(
|
|
self,
|
|
websocket: ServerConnection,
|
|
data: Dict[str, Any],
|
|
) -> None:
|
|
await gateway_admin_handlers.handle_get_agent_profile(self, websocket, data)
|
|
|
|
async def _handle_get_skill_detail(
|
|
self,
|
|
websocket: ServerConnection,
|
|
data: Dict[str, Any],
|
|
) -> None:
|
|
await gateway_admin_handlers.handle_get_skill_detail(self, websocket, data)
|
|
|
|
async def _handle_create_agent_local_skill(
|
|
self,
|
|
websocket: ServerConnection,
|
|
data: Dict[str, Any],
|
|
) -> None:
|
|
await gateway_admin_handlers.handle_create_agent_local_skill(self, websocket, data)
|
|
|
|
async def _handle_update_agent_local_skill(
|
|
self,
|
|
websocket: ServerConnection,
|
|
data: Dict[str, Any],
|
|
) -> None:
|
|
await gateway_admin_handlers.handle_update_agent_local_skill(self, websocket, data)
|
|
|
|
async def _handle_delete_agent_local_skill(
|
|
self,
|
|
websocket: ServerConnection,
|
|
data: Dict[str, Any],
|
|
) -> None:
|
|
await gateway_admin_handlers.handle_delete_agent_local_skill(self, websocket, data)
|
|
|
|
async def _handle_remove_agent_skill(
|
|
self,
|
|
websocket: ServerConnection,
|
|
data: Dict[str, Any],
|
|
) -> None:
|
|
await gateway_admin_handlers.handle_remove_agent_skill(self, websocket, data)
|
|
|
|
async def _handle_update_agent_skill(
|
|
self,
|
|
websocket: ServerConnection,
|
|
data: Dict[str, Any],
|
|
) -> None:
|
|
await gateway_admin_handlers.handle_update_agent_skill(self, websocket, data)
|
|
|
|
async def _handle_get_agent_workspace_file(
|
|
self,
|
|
websocket: ServerConnection,
|
|
data: Dict[str, Any],
|
|
) -> None:
|
|
await gateway_admin_handlers.handle_get_agent_workspace_file(self, websocket, data)
|
|
|
|
async def _handle_update_agent_workspace_file(
|
|
self,
|
|
websocket: ServerConnection,
|
|
data: Dict[str, Any],
|
|
) -> None:
|
|
await gateway_admin_handlers.handle_update_agent_workspace_file(self, websocket, data)
|
|
|
|
@staticmethod
|
|
def _normalize_watchlist(raw_tickers: Any) -> List[str]:
|
|
return gateway_runtime_support.normalize_watchlist(raw_tickers)
|
|
|
|
@staticmethod
|
|
def _normalize_agent_workspace_filename(raw_name: Any) -> Optional[str]:
|
|
return gateway_runtime_support.normalize_agent_workspace_filename(
|
|
raw_name,
|
|
allowlist=EDITABLE_AGENT_WORKSPACE_FILES,
|
|
)
|
|
|
|
def _apply_runtime_config(
|
|
self,
|
|
runtime_config: Dict[str, Any],
|
|
) -> Dict[str, Any]:
|
|
return gateway_runtime_support.apply_runtime_config(self, runtime_config)
|
|
|
|
def _sync_runtime_state(self) -> None:
|
|
gateway_runtime_support.sync_runtime_state(self)
|
|
|
|
def _schedule_watchlist_market_store_refresh(
|
|
self,
|
|
tickers: List[str],
|
|
) -> None:
|
|
gateway_cycle_support.schedule_watchlist_market_store_refresh(self, tickers)
|
|
|
|
async def _refresh_market_store_for_watchlist(
|
|
self,
|
|
tickers: List[str],
|
|
) -> None:
|
|
await gateway_cycle_support.refresh_market_store_for_watchlist(self, tickers)
|
|
|
|
async def broadcast(self, message: Dict[str, Any]):
|
|
"""Broadcast message to all connected clients"""
|
|
if not self.connected_clients:
|
|
return
|
|
|
|
message_json = json.dumps(message, ensure_ascii=False, default=str)
|
|
|
|
async with self.lock:
|
|
tasks = [
|
|
self._send_to_client(client, message_json)
|
|
for client in self.connected_clients.copy()
|
|
]
|
|
|
|
if tasks:
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
async def _send_to_client(
|
|
self,
|
|
client: ServerConnection,
|
|
message: str,
|
|
):
|
|
try:
|
|
await client.send(message)
|
|
except websockets.ConnectionClosed:
|
|
async with self.lock:
|
|
self.connected_clients.discard(client)
|
|
|
|
async def _market_status_monitor(self):
|
|
await gateway_cycle_support.market_status_monitor(self)
|
|
|
|
async def _update_and_broadcast_live_returns(self):
|
|
await gateway_cycle_support.update_and_broadcast_live_returns(self)
|
|
|
|
async def on_strategy_trigger(self, date: str):
|
|
await gateway_cycle_support.on_strategy_trigger(self, date)
|
|
|
|
async def on_heartbeat_trigger(self, date: str):
|
|
await gateway_cycle_support.on_heartbeat_trigger(self, date)
|
|
|
|
async def _run_backtest_cycle(self, date: str, tickers: List[str]):
|
|
await gateway_cycle_support.run_backtest_cycle(self, date, tickers)
|
|
|
|
async def _run_live_cycle(self, date: str, tickers: List[str]):
|
|
await gateway_cycle_support.run_live_cycle(self, date, tickers)
|
|
|
|
async def _finalize_cycle(self, date: str):
|
|
await gateway_cycle_support.finalize_cycle(self, date)
|
|
|
|
async def _get_market_caps(
|
|
self,
|
|
tickers: List[str],
|
|
date: str,
|
|
) -> Dict[str, float]:
|
|
return await gateway_cycle_support.get_market_caps(self, tickers, date)
|
|
|
|
async def _broadcast_portfolio_updates(
|
|
self,
|
|
result: Dict[str, Any],
|
|
prices: Dict[str, float],
|
|
):
|
|
await gateway_cycle_support.broadcast_portfolio_updates(self, result, prices)
|
|
|
|
def _save_cycle_results(
|
|
self,
|
|
result: Dict[str, Any],
|
|
date: str,
|
|
prices: Dict[str, float],
|
|
settlement_result: Optional[Dict[str, Any]] = None,
|
|
):
|
|
gateway_cycle_support.save_cycle_results(
|
|
self,
|
|
result,
|
|
date,
|
|
prices,
|
|
settlement_result,
|
|
)
|
|
|
|
async def _run_backtest_dates(self, dates: List[str]):
|
|
await gateway_cycle_support.run_backtest_dates(self, dates)
|
|
|
|
def _handle_backtest_exception(self, task: asyncio.Task):
|
|
gateway_cycle_support.handle_backtest_exception(self, task)
|
|
|
|
def _handle_manual_cycle_exception(self, task: asyncio.Task):
|
|
gateway_cycle_support.handle_manual_cycle_exception(self, task)
|
|
|
|
def set_backtest_dates(self, dates: List[str]):
|
|
gateway_cycle_support.set_backtest_dates(self, dates)
|
|
|
|
def stop(self):
|
|
gateway_cycle_support.stop_gateway(self)
|