From 11849208ed18f11650e79f4eacd09147142e8da3 Mon Sep 17 00:00:00 2001 From: cillin Date: Tue, 7 Apr 2026 13:58:49 +0800 Subject: [PATCH] perf: optimize system concurrency, I/O stability and fix WebSocket disconnects --- backend/agents/base/tool_guard.py | 6 +- backend/api/guard.py | 4 +- backend/apps/frontend_service.py | 7 +- backend/apps/news_service.py | 16 +- backend/apps/trading_service.py | 16 +- backend/core/pipeline.py | 2 +- backend/core/state_sync.py | 6 +- backend/data/market_store.py | 24 ++- backend/llm/models.py | 10 + backend/runtime/agent_runtime.py | 6 +- backend/runtime/manager.py | 8 +- backend/services/gateway.py | 22 ++- backend/services/gateway_cycle_support.py | 18 +- backend/services/gateway_stock_handlers.py | 212 +++++++++++---------- backend/services/storage.py | 31 +-- deploy/README.md | 4 +- deploy/install-production.sh | 164 ++++++++++------ deploy/production-deployment.md | 2 +- deploy/systemd/README.md | 2 +- shared/client/news_client.py | 6 +- shared/client/trading_client.py | 6 +- 21 files changed, 357 insertions(+), 215 deletions(-) mode change 100644 => 100755 deploy/install-production.sh diff --git a/backend/agents/base/tool_guard.py b/backend/agents/base/tool_guard.py index bb3d2db..3c0c645 100644 --- a/backend/agents/base/tool_guard.py +++ b/backend/agents/base/tool_guard.py @@ -12,7 +12,7 @@ from __future__ import annotations import asyncio import json import logging -from datetime import UTC, datetime +from datetime import datetime, timezone from enum import Enum from typing import Any, Callable, Dict, Iterable, List, Optional, Set @@ -78,7 +78,7 @@ class ApprovalRecord: self.session_id = session_id self.status = ApprovalStatus.PENDING self.findings = findings or [] - self.created_at = datetime.now(UTC) + self.created_at = datetime.now(timezone.utc) self.resolved_at: Optional[datetime] = None self.resolved_by: Optional[str] = None self.metadata: Dict[str, Any] = {} @@ -163,7 +163,7 @@ class ToolGuardStore: return record record.status = status - record.resolved_at = datetime.now(UTC) + record.resolved_at = datetime.now(timezone.utc) record.resolved_by = resolved_by if notify_request and record.pending_request: if status == ApprovalStatus.APPROVED: diff --git a/backend/api/guard.py b/backend/api/guard.py index 97b7120..4795737 100644 --- a/backend/api/guard.py +++ b/backend/api/guard.py @@ -7,7 +7,7 @@ Provides REST API endpoints for tool guard operations. from __future__ import annotations from typing import Any, Dict, List, Optional -from datetime import UTC, datetime +from datetime import datetime, timezone from fastapi import APIRouter, HTTPException from pydantic import BaseModel, Field @@ -146,7 +146,7 @@ async def check_tool_call( if request.tool_name in SAFE_TOOLS: record.status = ApprovalStatus.APPROVED - record.resolved_at = datetime.now(UTC) + record.resolved_at = datetime.now(timezone.utc) record.resolved_by = "system" STORE.set_status( record.approval_id, diff --git a/backend/apps/frontend_service.py b/backend/apps/frontend_service.py index a3e47b6..55e7000 100644 --- a/backend/apps/frontend_service.py +++ b/backend/apps/frontend_service.py @@ -81,7 +81,12 @@ async def proxy_ws(ws: WebSocket): await ws.accept() upstream = None try: - upstream = await websockets.asyncio.client.connect(gateway_url) + upstream = await websockets.asyncio.client.connect( + gateway_url, + ping_interval=20, + ping_timeout=120, + max_size=10 * 1024 * 1024, # 10MB + ) async def client_to_upstream(): try: diff --git a/backend/apps/news_service.py b/backend/apps/news_service.py index d3598be..661b41b 100644 --- a/backend/apps/news_service.py +++ b/backend/apps/news_service.py @@ -28,11 +28,11 @@ def create_app() -> FastAPI: add_cors_middleware(app) @app.get("/health") - async def health_check() -> dict[str, str]: + def health_check() -> dict[str, str]: return {"status": "healthy", "service": "news-service"} @app.get("/api/enriched-news") - async def api_get_enriched_news( + def api_get_enriched_news( ticker: str = Query(..., min_length=1), start_date: str | None = Query(None), end_date: str | None = Query(None), @@ -49,7 +49,7 @@ def create_app() -> FastAPI: ) @app.get("/api/news-for-date") - async def api_get_news_for_date( + def api_get_news_for_date( ticker: str = Query(..., min_length=1), date: str = Query(...), limit: int = Query(20, ge=1, le=100), @@ -64,7 +64,7 @@ def create_app() -> FastAPI: ) @app.get("/api/news-timeline") - async def api_get_news_timeline( + def api_get_news_timeline( ticker: str = Query(..., min_length=1), start_date: str = Query(...), end_date: str = Query(...), @@ -79,7 +79,7 @@ def create_app() -> FastAPI: ) @app.get("/api/categories") - async def api_get_categories( + def api_get_categories( ticker: str = Query(..., min_length=1), start_date: str | None = Query(None), end_date: str | None = Query(None), @@ -96,7 +96,7 @@ def create_app() -> FastAPI: ) @app.get("/api/similar-days") - async def api_get_similar_days( + def api_get_similar_days( ticker: str = Query(..., min_length=1), date: str = Query(...), n_similar: int = Query(5, ge=1, le=20), @@ -111,7 +111,7 @@ def create_app() -> FastAPI: ) @app.get("/api/stories/{ticker}") - async def api_get_story( + def api_get_story( ticker: str, as_of_date: str = Query(...), store: MarketStore = Depends(get_market_store), @@ -124,7 +124,7 @@ def create_app() -> FastAPI: ) @app.get("/api/range-explain") - async def api_get_range_explain( + def api_get_range_explain( ticker: str = Query(..., min_length=1), start_date: str = Query(...), end_date: str = Query(...), diff --git a/backend/apps/trading_service.py b/backend/apps/trading_service.py index b06efcd..24edc3f 100644 --- a/backend/apps/trading_service.py +++ b/backend/apps/trading_service.py @@ -29,12 +29,12 @@ def create_app() -> FastAPI: add_cors_middleware(app) @app.get("/health") - async def health_check() -> dict[str, str]: + def health_check() -> dict[str, str]: """Health check endpoint.""" return {"status": "healthy", "service": "trading-service"} @app.get("/api/prices", response_model=PriceResponse) - async def api_get_prices( + def api_get_prices( ticker: str = Query(..., min_length=1), start_date: str = Query(...), end_date: str = Query(...), @@ -47,7 +47,7 @@ def create_app() -> FastAPI: return PriceResponse(ticker=payload["ticker"], prices=payload["prices"]) @app.get("/api/financials", response_model=FinancialMetricsResponse) - async def api_get_financials( + def api_get_financials( ticker: str = Query(..., min_length=1), end_date: str = Query(...), period: str = Query("ttm"), @@ -62,7 +62,7 @@ def create_app() -> FastAPI: return FinancialMetricsResponse(financial_metrics=payload["financial_metrics"]) @app.get("/api/news", response_model=CompanyNewsResponse) - async def api_get_news( + def api_get_news( ticker: str = Query(..., min_length=1), end_date: str = Query(...), start_date: str | None = Query(None), @@ -77,7 +77,7 @@ def create_app() -> FastAPI: return CompanyNewsResponse(news=payload["news"]) @app.get("/api/insider-trades", response_model=InsiderTradeResponse) - async def api_get_insider_trades( + def api_get_insider_trades( ticker: str = Query(..., min_length=1), end_date: str = Query(...), start_date: str | None = Query(None), @@ -92,12 +92,12 @@ def create_app() -> FastAPI: return InsiderTradeResponse(insider_trades=payload["insider_trades"]) @app.get("/api/market/status") - async def api_get_market_status() -> dict[str, Any]: + def api_get_market_status() -> dict[str, Any]: """Return current market status using the existing market service logic.""" return trading_domain.get_market_status_payload() @app.get("/api/market-cap") - async def api_get_market_cap( + def api_get_market_cap( ticker: str = Query(..., min_length=1), end_date: str = Query(...), ) -> dict[str, Any]: @@ -108,7 +108,7 @@ def create_app() -> FastAPI: ) @app.get("/api/line-items", response_model=LineItemResponse) - async def api_get_line_items( + def api_get_line_items( ticker: str = Query(..., min_length=1), line_items: list[str] = Query(...), end_date: str = Query(...), diff --git a/backend/core/pipeline.py b/backend/core/pipeline.py index a3c49eb..55cadc3 100644 --- a/backend/core/pipeline.py +++ b/backend/core/pipeline.py @@ -144,7 +144,7 @@ class TradingPipeline: self._team_controller = DynamicTeamController( create_callback=self._create_runtime_analyst, remove_callback=self._remove_runtime_analyst, - get_analysts_callback=self._all_analysts, + get_analysts_callback=lambda: self._all_analysts() + [self.risk_manager, self.pm], ) set_controller(self._team_controller) diff --git a/backend/core/state_sync.py b/backend/core/state_sync.py index 1f4c3c6..26ba614 100644 --- a/backend/core/state_sync.py +++ b/backend/core/state_sync.py @@ -123,7 +123,11 @@ class StateSync: # Persist to feed_history if persist: self.storage.add_feed_message(self._state, event) - self.save_state() + # Make persistence non-blocking to keep event loop snappy + if asyncio.get_event_loop().is_running(): + asyncio.create_task(asyncio.to_thread(self.save_state)) + else: + self.save_state() # Broadcast to frontend if self._broadcast_fn: diff --git a/backend/data/market_store.py b/backend/data/market_store.py index f5af092..d0ee83e 100644 --- a/backend/data/market_store.py +++ b/backend/data/market_store.py @@ -190,8 +190,9 @@ class MarketStore: name: str | None = None, sector: str | None = None, is_active: bool = True, - ) -> None: + ) -> int: timestamp = _utc_timestamp() + count = 0 with self._connect() as conn: conn.execute( """ @@ -206,6 +207,8 @@ class MarketStore: """, (symbol, name, sector, 1 if is_active else 0, timestamp, timestamp), ) + count += 1 + return count def update_fetch_watermark( self, @@ -213,8 +216,9 @@ class MarketStore: symbol: str, price_date: str | None = None, news_date: str | None = None, - ) -> None: + ) -> int: timestamp = _utc_timestamp() + count = 0 with self._connect() as conn: conn.execute( """ @@ -227,6 +231,8 @@ class MarketStore: """, (symbol, timestamp, timestamp, price_date, news_date), ) + count += 1 + return count def get_ticker_watermarks(self, symbol: str) -> dict[str, Any]: with self._connect() as conn: @@ -263,6 +269,8 @@ class MarketStore: count = 0 with self._connect() as conn: for row in rows: + if not row.get("date"): + continue conn.execute( """ INSERT INTO ohlc @@ -341,6 +349,7 @@ class MarketStore: timestamp, ), ) + count += 1 for ticker in tickers: conn.execute( """ @@ -349,7 +358,6 @@ class MarketStore: """, (news_id, str(ticker).strip().upper()), ) - count += 1 return count def get_news_without_trade_date(self, symbol: str | None = None, *, limit: int = 5000) -> list[dict[str, Any]]: @@ -928,8 +936,9 @@ class MarketStore: as_of_date: str, content: str, source: str = "local", - ) -> None: + ) -> int: timestamp = _utc_timestamp() + count = 0 with self._connect() as conn: conn.execute( """ @@ -943,6 +952,8 @@ class MarketStore: """, (symbol, as_of_date, content, source, timestamp, timestamp), ) + count += 1 + return count def delete_story_cache( self, @@ -1002,8 +1013,9 @@ class MarketStore: target_date: str, payload: dict[str, Any], source: str = "local", - ) -> None: + ) -> int: timestamp = _utc_timestamp() + count = 0 with self._connect() as conn: conn.execute( """ @@ -1017,6 +1029,8 @@ class MarketStore: """, (symbol, target_date, _json_dumps(payload), source, timestamp, timestamp), ) + count += 1 + return count def delete_similar_day_cache( self, diff --git a/backend/llm/models.py b/backend/llm/models.py index d9f8da7..0c8d264 100644 --- a/backend/llm/models.py +++ b/backend/llm/models.py @@ -444,6 +444,16 @@ def create_model( """ provider = canonicalize_model_provider(provider) + # If provider is default OPENAI but model name looks like deepseek, + # check if we should switch to DASHSCOPE. + if provider == "OPENAI" and "deepseek" in model_name.lower() and os.getenv("DASHSCOPE_API_KEY"): + provider = "DASHSCOPE" + + # Intelligent routing: if it's a DeepSeek model and we have DashScope credentials, + # prefer using DashScopeChatModel over OpenAIChatModel. + if provider == "DEEPSEEK" and os.getenv("DASHSCOPE_API_KEY"): + provider = "DASHSCOPE" + model_class = PROVIDER_MODEL_MAP.get(provider) if model_class is None: raise ValueError(f"Unsupported provider: {provider}") diff --git a/backend/runtime/agent_runtime.py b/backend/runtime/agent_runtime.py index ec41074..7e039ce 100644 --- a/backend/runtime/agent_runtime.py +++ b/backend/runtime/agent_runtime.py @@ -1,7 +1,7 @@ from __future__ import annotations from dataclasses import dataclass, field -from datetime import datetime, UTC +from datetime import datetime, timezone from typing import Any, Dict @@ -11,12 +11,12 @@ class AgentRuntimeState: display_name: str | None = None status: str = "idle" last_session: str | None = None - last_updated: datetime = field(default_factory=lambda: datetime.now(UTC)) + last_updated: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) def update(self, status: str, session_key: str | None = None) -> None: self.status = status self.last_session = session_key - self.last_updated = datetime.now(UTC) + self.last_updated = datetime.now(timezone.utc) def to_dict(self) -> Dict[str, Any]: return { diff --git a/backend/runtime/manager.py b/backend/runtime/manager.py index 79c1c11..e9fff87 100644 --- a/backend/runtime/manager.py +++ b/backend/runtime/manager.py @@ -2,7 +2,7 @@ from __future__ import annotations import asyncio import json -from datetime import datetime, UTC +from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional @@ -93,7 +93,7 @@ class TradingRuntimeManager: def log_event(self, event: str, details: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: entry = { - "timestamp": datetime.now(UTC).isoformat(), + "timestamp": datetime.now(timezone.utc).isoformat(), "event": event, "details": details or {}, "session": self.current_session_key, @@ -120,7 +120,7 @@ class TradingRuntimeManager: def register_pending_approval(self, approval_id: str, payload: Dict[str, Any]) -> None: payload.setdefault("status", "pending") - payload.setdefault("created_at", datetime.now(UTC).isoformat()) + payload.setdefault("created_at", datetime.now(timezone.utc).isoformat()) self.pending_approvals[approval_id] = payload self._persist_snapshot() @@ -149,7 +149,7 @@ class TradingRuntimeManager: if not entry: return entry["status"] = status - entry["resolved_at"] = datetime.now(UTC).isoformat() + entry["resolved_at"] = datetime.now(timezone.utc).isoformat() entry["resolved_by"] = resolved_by self._persist_snapshot() diff --git a/backend/services/gateway.py b/backend/services/gateway.py index 3f24fc9..2c29960 100644 --- a/backend/services/gateway.py +++ b/backend/services/gateway.py @@ -148,8 +148,9 @@ class Gateway: self.handle_client, host, port, - ping_interval=30, - ping_timeout=60, + ping_interval=20, + ping_timeout=120, + max_size=10 * 1024 * 1024, # 10MB ) logger.info(f"WebSocket server ready: ws://{host}:{port}") @@ -833,12 +834,18 @@ class Gateway: if not self.connected_clients: return - message_json = json.dumps(message, ensure_ascii=False, default=str) + # Offload potentially heavy JSON serialization to thread + message_json = await asyncio.to_thread( + json.dumps, message, ensure_ascii=False, default=str + ) async with self.lock: + # Filter only active clients to minimize unnecessary send attempts + # In websockets v13+, we must check state.name == 'OPEN' + active_clients = [c for c in self.connected_clients if c.state.name == 'OPEN'] tasks = [ self._send_to_client(client, message_json) - for client in self.connected_clients.copy() + for client in active_clients ] if tasks: @@ -849,9 +856,14 @@ class Gateway: client: ServerConnection, message: str, ): + if client.state.name != 'OPEN': + async with self.lock: + self.connected_clients.discard(client) + return + try: await client.send(message) - except websockets.ConnectionClosed: + except (websockets.ConnectionClosed, Exception): async with self.lock: self.connected_clients.discard(client) diff --git a/backend/services/gateway_cycle_support.py b/backend/services/gateway_cycle_support.py index 4b1dbf9..dab62e8 100644 --- a/backend/services/gateway_cycle_support.py +++ b/backend/services/gateway_cycle_support.py @@ -253,7 +253,8 @@ async def finalize_cycle(gateway: Any, date: str) -> None: async def get_market_caps(gateway: Any, tickers: list[str], date: str) -> dict[str, float]: market_caps: dict[str, float] = {} - for ticker in tickers: + + async def _get_one(ticker: str): try: market_cap = None response = await gateway._call_trading_service( @@ -263,12 +264,21 @@ async def get_market_caps(gateway: Any, tickers: list[str], date: str) -> dict[s if response is not None: market_cap = response.get("market_cap") if market_cap is None: - payload = trading_domain.get_market_cap_payload(ticker=ticker, end_date=date) + payload = await asyncio.to_thread( + trading_domain.get_market_cap_payload, + ticker=ticker, + end_date=date, + ) market_cap = payload.get("market_cap") - market_caps[ticker] = market_cap if market_cap else 1e9 + return ticker, (market_cap if market_cap else 1e9) except Exception as exc: logger.warning("Failed to get market cap for %s, using default 1e9: %s", ticker, exc) - market_caps[ticker] = 1e9 + return ticker, 1e9 + + tasks = [_get_one(ticker) for ticker in tickers] + results = await asyncio.gather(*tasks) + for ticker, mc in results: + market_caps[ticker] = mc return market_caps diff --git a/backend/services/gateway_stock_handlers.py b/backend/services/gateway_stock_handlers.py index 0db9168..84a3b2b 100644 --- a/backend/services/gateway_stock_handlers.py +++ b/backend/services/gateway_stock_handlers.py @@ -517,111 +517,129 @@ async def handle_get_stock_similar_days(gateway: Any, websocket: Any, data: dict async def handle_get_stock_technical_indicators(gateway: Any, websocket: Any, data: dict[str, Any]) -> None: - ticker = normalize_symbol(data.get("ticker", "")) - if not ticker: - await websocket.send(json.dumps({ - "type": "stock_technical_indicators_loaded", - "ticker": ticker, - "indicators": None, - "error": "ticker is required", - }, ensure_ascii=False)) - return + ticker = normalize_symbol(data.get("ticker", "")) + if not ticker: + await websocket.send(json.dumps({ + "type": "stock_technical_indicators_loaded", + "ticker": ticker, + "indicators": None, + "error": "ticker is required", + }, ensure_ascii=False)) + return - try: - end_date = datetime.now() - start_date = end_date - timedelta(days=250) + try: + end_date = datetime.now() + # Reduced from 250 to 150 days to lower CPU/memory pressure while still supporting MA200 (approx 140 trading days) + start_date = end_date - timedelta(days=150) - prices = None - response = await gateway._call_trading_service( - "get_prices", - lambda client: client.get_prices( - ticker=ticker, - start_date=start_date.strftime("%Y-%m-%d"), - end_date=end_date.strftime("%Y-%m-%d"), - ), - ) - if response is not None: - prices = response.prices + prices = None + response = await gateway._call_trading_service( + "get_prices", + lambda client: client.get_prices( + ticker=ticker, + start_date=start_date.strftime("%Y-%m-%d"), + end_date=end_date.strftime("%Y-%m-%d"), + ), + ) + if response is not None: + prices = response.prices - if prices is None: - payload = trading_domain.get_prices_payload( - ticker=ticker, - start_date=start_date.strftime("%Y-%m-%d"), - end_date=end_date.strftime("%Y-%m-%d"), - ) - prices = payload.get("prices") or [] + if prices is None: + # Offload domain logic to thread + payload = await asyncio.to_thread( + trading_domain.get_prices_payload, + ticker=ticker, + start_date=start_date.strftime("%Y-%m-%d"), + end_date=end_date.strftime("%Y-%m-%d"), + ) + prices = payload.get("prices") or [] - if not prices or len(prices) < 20: - await websocket.send(json.dumps({ - "type": "stock_technical_indicators_loaded", - "ticker": ticker, - "indicators": None, - "error": "Insufficient price data", - }, ensure_ascii=False)) - return + if not prices or len(prices) < 20: + await websocket.send(json.dumps({ + "type": "stock_technical_indicators_loaded", + "ticker": ticker, + "indicators": None, + "error": "Insufficient price data", + }, ensure_ascii=False)) + return - df = prices_to_df(prices) - signal = gateway._technical_analyzer.analyze(ticker, df) + def _calc(): + df = prices_to_df(prices) + signal = gateway._technical_analyzer.analyze(ticker, df) + df_sorted = df.sort_values("time").reset_index(drop=True) + df_sorted["returns"] = df_sorted["close"].pct_change() + v10 = float(df_sorted["returns"].tail(10).std() * (252**0.5) * 100) if len(df_sorted) >= 10 else None + v20 = float(df_sorted["returns"].tail(20).std() * (252**0.5) * 100) if len(df_sorted) >= 20 else None + v60 = float(df_sorted["returns"].tail(60).std() * (252**0.5) * 100) if len(df_sorted) >= 60 else None - df_sorted = df.sort_values("time").reset_index(drop=True) - df_sorted["returns"] = df_sorted["close"].pct_change() - vol_10 = float(df_sorted["returns"].tail(10).std() * (252**0.5) * 100) if len(df_sorted) >= 10 else None - vol_20 = float(df_sorted["returns"].tail(20).std() * (252**0.5) * 100) if len(df_sorted) >= 20 else None - vol_60 = float(df_sorted["returns"].tail(60).std() * (252**0.5) * 100) if len(df_sorted) >= 60 else None - ma_distance = {} - for ma_key in ["ma5", "ma10", "ma20", "ma50", "ma200"]: - ma_value = getattr(signal, ma_key, None) - ma_distance[ma_key] = ((signal.current_price - ma_value) / ma_value) * 100 if ma_value and ma_value > 0 else None + ma_dist = {} + for ma_key in ["ma5", "ma10", "ma20", "ma50", "ma200"]: + ma_val = getattr(signal, ma_key, None) + ma_dist[ma_key] = ((signal.current_price - ma_val) / ma_val) * 100 if ma_val and ma_val > 0 else None - indicators = { - "ticker": ticker, - "current_price": signal.current_price, - "ma": { - "ma5": signal.ma5, - "ma10": signal.ma10, - "ma20": signal.ma20, - "ma50": signal.ma50, - "ma200": signal.ma200, - "distance": ma_distance, - }, - "rsi": { - "rsi14": signal.rsi14, - "status": "oversold" if signal.rsi14 < 30 else "overbought" if signal.rsi14 > 70 else "neutral", - }, - "macd": { - "macd": signal.macd, - "signal": signal.macd_signal, - "histogram": signal.macd - signal.macd_signal, - }, - "bollinger": { - "upper": signal.bollinger_upper, - "mid": signal.bollinger_mid, - "lower": signal.bollinger_lower, - }, - "volatility": { - "vol_10d": vol_10, - "vol_20d": vol_20, - "vol_60d": vol_60, - "annualized": signal.annualized_volatility_pct, - "risk_level": signal.risk_level, - }, - "trend": signal.trend, - "mean_reversion": signal.mean_reversion_signal, - } + return { + "ticker": ticker, + "current_price": signal.current_price, + "ma": { + "ma5": signal.ma5, + "ma10": signal.ma10, + "ma20": signal.ma20, + "ma50": signal.ma50, + "ma200": signal.ma200, + "distance": ma_dist, + }, + "rsi": { + "rsi14": signal.rsi14, + "status": "oversold" if signal.rsi14 < 30 else "overbought" if signal.rsi14 > 70 else "neutral", + }, + "macd": { + "macd": signal.macd, + "signal": signal.macd_signal, + "histogram": signal.macd - signal.macd_signal, + }, + "bollinger": { + "upper": signal.bollinger_upper, + "mid": signal.bollinger_mid, + "lower": signal.bollinger_lower, + }, + "volatility": { + "vol_10d": v10, + "vol_20d": v20, + "vol_60d": v60, + "annualized": signal.annualized_volatility_pct, + "risk_level": signal.risk_level, + }, + "trend": signal.trend, + "mean_reversion": signal.mean_reversion_signal, + } - await websocket.send(json.dumps({ - "type": "stock_technical_indicators_loaded", - "ticker": ticker, - "indicators": indicators, - }, ensure_ascii=False, default=str)) - except Exception as exc: - logger.exception("Error getting technical indicators for %s", ticker) - await websocket.send(json.dumps({ - "type": "stock_technical_indicators_loaded", - "ticker": ticker, - "indicators": None, - "error": str(exc), - }, ensure_ascii=False)) + # Use a semaphore to prevent too many concurrent CPU-intensive calculations + # which can block the event loop heartbeats. + if not hasattr(gateway, "_calc_sem"): + gateway._calc_sem = asyncio.Semaphore(3) + + async with gateway._calc_sem: + indicators = await asyncio.to_thread(_calc) + + # Also offload JSON serialization to thread to avoid blocking main loop + msg = await asyncio.to_thread(json.dumps, { + "type": "stock_technical_indicators_loaded", + "ticker": ticker, + "indicators": indicators, + }, ensure_ascii=False, default=str) + + if websocket.state.name == 'OPEN': + await websocket.send(msg) + else: + logger.warning("Websocket closed for %s, skipping indicator send", ticker) + except Exception as exc: + logger.exception("Error getting technical indicators for %s", ticker) + await websocket.send(json.dumps({ + "type": "stock_technical_indicators_loaded", + "ticker": ticker, + "indicators": None, + "error": str(exc), + }, ensure_ascii=False)) async def handle_run_stock_enrich(gateway: Any, websocket: Any, data: dict[str, Any]) -> None: diff --git a/backend/services/storage.py b/backend/services/storage.py index 7ae1196..a4005df 100644 --- a/backend/services/storage.py +++ b/backend/services/storage.py @@ -7,6 +7,7 @@ Handles reading/writing dashboard JSON files and portfolio state import json import logging import os +import time from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional @@ -950,11 +951,14 @@ class StorageService: def save_server_state(self, state: Dict[str, Any]): """ - Save server state to file - - Args: - state: Server state dictionary + Save server state to file with rate-limiting to avoid I/O storms. """ + now = time.time() + # Ensure at least 2 seconds between physical disk writes + if hasattr(self, "_last_save_time") and (now - self._last_save_time) < 2.0: + return + self._last_save_time = now + state_to_save = { **state, "last_saved": datetime.now().isoformat(), @@ -970,14 +974,17 @@ class StorageService: if "trades" in state_to_save: state_to_save["trades"] = state_to_save["trades"][:100] - with open(self.server_state_file, "w", encoding="utf-8") as f: - json.dump( - state_to_save, - f, - ensure_ascii=False, - indent=2, - default=str, - ) + try: + with open(self.server_state_file, "w", encoding="utf-8") as f: + # Removed indent=2 to minimize file size and serialization overhead + json.dump( + state_to_save, + f, + ensure_ascii=False, + default=str, + ) + except Exception as e: + logger.error(f"Failed to save server state: {e}") logger.debug(f"Server state saved to: {self.server_state_file}") diff --git a/deploy/README.md b/deploy/README.md index 148f891..b3458b8 100644 --- a/deploy/README.md +++ b/deploy/README.md @@ -83,7 +83,7 @@ Before using the production scripts, ensure the runtime environment has: - a usable Python environment - backend dependencies installed from the checked-in Python package metadata in `pyproject.toml` - the package installed with `pip install -e .` or `uv pip install -e .` -- frontend dependencies installed with `npm ci` +- frontend dependencies installed with `npm install` - repo dependencies installed - required market/model API keys - any desired `TICKERS` override @@ -94,7 +94,7 @@ Recommended production install sequence: python3 -m venv .venv source .venv/bin/activate pip install -e . -cd frontend && npm ci && npm run build && cd .. +cd frontend && npm install && npm run build && cd .. ``` ## Skill Sandbox Configuration diff --git a/deploy/install-production.sh b/deploy/install-production.sh old mode 100644 new mode 100755 index 318a900..2cd2df4 --- a/deploy/install-production.sh +++ b/deploy/install-production.sh @@ -166,7 +166,7 @@ KillMode=mixed NoNewPrivileges=true PrivateTmp=true ProtectSystem=full -ProtectHome=true +ProtectHome=false LimitNOFILE=65535 TasksMax=4096 MemoryMax=${memory_max} @@ -477,7 +477,17 @@ main() { SERVICE_GROUP="${SERVICE_GROUP:-$(ask_required 'systemd 运行用户组' "$(id -gn)")}" - DOMAIN="${DOMAIN:-$(ask_required '部署域名(可填写 IP 或 localhost)' 'localhost')}" + # 自动尝试获取公网 IP 作为默认域名值 + local detected_ip="" + if [[ -z "${DOMAIN:-}" ]]; then + log "正在尝试自动获取公网 IP..." + detected_ip=$(curl -s --connect-timeout 5 https://ifconfig.me || curl -s --connect-timeout 5 https://api.ipify.org || echo "") + if [[ -n "${detected_ip}" ]]; then + log "自动检测到公网 IP: ${detected_ip}" + fi + fi + + DOMAIN="${DOMAIN:-$(ask_required '部署域名(可填写 IP 或 localhost)' "${detected_ip:-localhost}")}" validate_domain_like "${DOMAIN}" || warn "域名/IP 形态看起来不标准,请再次确认: ${DOMAIN}" ENV_FILE="${ENV_FILE:-$(ask_required '环境变量文件路径' '/etc/bigtime/bigtime.env')}" @@ -486,53 +496,65 @@ main() { PYTHON_BIN="${PYTHON_BIN:-$(ask 'Python 可执行文件路径' "${APP_DIR}/.venv/bin/python")}" [[ -n "${PYTHON_BIN}" ]] || fail "Python 路径不能为空" - echo "" - echo -e "${CYAN}运行参数${NC}" - TICKERS="${TICKERS:-$(ask '默认股票池(逗号分隔)' 'AAPL,MSFT,GOOGL,AMZN,NVDA,META,TSLA,AMD,NFLX,AVGO,PLTR,COIN')}" - FIN_DATA_SOURCE="${FIN_DATA_SOURCE:-$(ask '行情数据源(finnhub/yfinance/financial_datasets)' 'finnhub')}" - MODEL_NAME="${MODEL_NAME:-$(ask '默认模型名' 'qwen3-max')}" - MAX_COMM_CYCLES="${MAX_COMM_CYCLES:-$(ask_required '最大讨论轮数' '2')}" - validate_numeric "${MAX_COMM_CYCLES}" || fail "最大讨论轮数必须是数字: ${MAX_COMM_CYCLES}" - MARGIN_REQUIREMENT="${MARGIN_REQUIREMENT:-$(ask_required '保证金比例' '0.5')}" - validate_numeric "${MARGIN_REQUIREMENT}" || fail "保证金比例必须是数字: ${MARGIN_REQUIREMENT}" - - echo "" - echo -e "${CYAN}密钥配置${NC}" - FINANCIAL_DATASETS_API_KEY="${FINANCIAL_DATASETS_API_KEY:-$(ask 'FINANCIAL_DATASETS_API_KEY(可留空)' '')}" - FINNHUB_API_KEY="${FINNHUB_API_KEY:-$(ask 'FINNHUB_API_KEY(live 模式建议填写)' '')}" - POLYGON_API_KEY="${POLYGON_API_KEY:-$(ask 'POLYGON_API_KEY(可留空)' '')}" - OPENAI_API_KEY="${OPENAI_API_KEY:-$(ask 'OPENAI_API_KEY(可留空)' '')}" - OPENAI_BASE_URL="${OPENAI_BASE_URL:-$(ask 'OPENAI_BASE_URL(可留空)' '')}" - DASHSCOPE_API_KEY="${DASHSCOPE_API_KEY:-$(ask 'DASHSCOPE_API_KEY(可留空)' '')}" - MEMORY_API_KEY="${MEMORY_API_KEY:-$(ask 'MEMORY_API_KEY(可留空)' '')}" - - if [[ "${FIN_DATA_SOURCE}" == "finnhub" && -z "${FINNHUB_API_KEY}" ]]; then - warn "你选择了 finnhub 作为数据源,但 FINNHUB_API_KEY 为空。live 模式下通常会失败。" - fi - if [[ -z "${OPENAI_API_KEY}" && -z "${DASHSCOPE_API_KEY}" ]]; then - warn "OPENAI_API_KEY 和 DASHSCOPE_API_KEY 都为空,模型调用可能无法工作。" + local SKIP_ENV_CONFIG=false + if [[ -f "${ENV_FILE}" ]]; then + echo "" + if confirm "检测到环境变量文件 ${ENV_FILE} 已存在,是否跳过详细参数配置并保留现有文件?" "Y"; then + SKIP_ENV_CONFIG=true + fi fi - if confirm "使用 Docker 沙盒执行技能?" "N" "${AUTO_USE_DOCKER}"; then - SKILL_SANDBOX_MODE="docker" + if ! ${SKIP_ENV_CONFIG}; then + echo "" + echo -e "${CYAN}运行参数${NC}" + TICKERS="${TICKERS:-$(ask '默认股票池(逗号分隔)' 'AAPL,MSFT,GOOGL,AMZN,NVDA,META,TSLA,AMD,NFLX,AVGO,PLTR,COIN')}" + FIN_DATA_SOURCE="${FIN_DATA_SOURCE:-$(ask '行情数据源(finnhub/yfinance/financial_datasets)' 'finnhub')}" + MODEL_NAME="${MODEL_NAME:-$(ask '默认模型名' 'qwen3-max')}" + MAX_COMM_CYCLES="${MAX_COMM_CYCLES:-$(ask_required '最大讨论轮数' '2')}" + validate_numeric "${MAX_COMM_CYCLES}" || fail "最大讨论轮数必须是数字: ${MAX_COMM_CYCLES}" + MARGIN_REQUIREMENT="${MARGIN_REQUIREMENT:-$(ask_required '保证金比例' '0.5')}" + validate_numeric "${MARGIN_REQUIREMENT}" || fail "保证金比例必须是数字: ${MARGIN_REQUIREMENT}" + + echo "" + echo -e "${CYAN}密钥配置${NC}" + FINANCIAL_DATASETS_API_KEY="${FINANCIAL_DATASETS_API_KEY:-$(ask 'FINANCIAL_DATASETS_API_KEY(可留空)' '')}" + FINNHUB_API_KEY="${FINNHUB_API_KEY:-$(ask 'FINNHUB_API_KEY(live 模式建议填写)' '')}" + POLYGON_API_KEY="${POLYGON_API_KEY:-$(ask 'POLYGON_API_KEY(可留空)' '')}" + OPENAI_API_KEY="${OPENAI_API_KEY:-$(ask 'OPENAI_API_KEY(可留空)' '')}" + OPENAI_BASE_URL="${OPENAI_BASE_URL:-$(ask 'OPENAI_BASE_URL(可留空)' '')}" + DASHSCOPE_API_KEY="${DASHSCOPE_API_KEY:-$(ask 'DASHSCOPE_API_KEY(可留空)' '')}" + MEMORY_API_KEY="${MEMORY_API_KEY:-$(ask 'MEMORY_API_KEY(可留空)' '')}" + + if [[ "${FIN_DATA_SOURCE}" == "finnhub" && -z "${FINNHUB_API_KEY}" ]]; then + warn "你选择了 finnhub 作为数据源,但 FINNHUB_API_KEY 为空。live 模式下通常会失败。" + fi + if [[ -z "${OPENAI_API_KEY}" && -z "${DASHSCOPE_API_KEY}" ]]; then + warn "OPENAI_API_KEY 和 DASHSCOPE_API_KEY 都为空,模型调用可能无法工作。" + fi + + if confirm "使用 Docker 沙盒执行技能?" "N" "${AUTO_USE_DOCKER}"; then + SKILL_SANDBOX_MODE="docker" + else + SKILL_SANDBOX_MODE="none" + fi + + echo "" + echo -e "${CYAN}当前部署摘要${NC}" + echo " 应用目录: ${APP_DIR}" + echo " 运行用户: ${SERVICE_USER}:${SERVICE_GROUP}" + echo " 域名: ${DOMAIN}" + echo " 环境文件: ${ENV_FILE}" + echo " Python: ${PYTHON_BIN}" + echo " 数据源: ${FIN_DATA_SOURCE:-}" + echo " 模型: ${MODEL_NAME:-}" + echo " 沙盒模式: ${SKILL_SANDBOX_MODE:-none}" + echo "" + + if ! confirm "确认以上配置并继续写入系统文件?" "Y"; then + fail "用户取消部署。" + fi else - SKILL_SANDBOX_MODE="none" - fi - - echo "" - echo -e "${CYAN}当前部署摘要${NC}" - echo " 应用目录: ${APP_DIR}" - echo " 运行用户: ${SERVICE_USER}:${SERVICE_GROUP}" - echo " 域名: ${DOMAIN}" - echo " 环境文件: ${ENV_FILE}" - echo " Python: ${PYTHON_BIN}" - echo " 数据源: ${FIN_DATA_SOURCE}" - echo " 模型: ${MODEL_NAME}" - echo " 沙盒模式: ${SKILL_SANDBOX_MODE}" - echo "" - - if ! confirm "确认以上配置并继续写入系统文件?" "Y"; then - fail "用户取消部署。" + echo -e "${GREEN}将使用现有的环境文件,跳过详细参数配置。${NC}" fi if [[ ! -x "${PYTHON_BIN}" ]]; then @@ -546,10 +568,12 @@ main() { "${PYTHON_BIN}" -m pip install -e "${APP_DIR}" log "构建前端" - (cd "${APP_DIR}/frontend" && npm ci && npm run build) + (cd "${APP_DIR}/frontend" && npm install && npm run build) - log "写入环境变量文件 ${ENV_FILE}" - write_env_file + if ! ${SKIP_ENV_CONFIG}; then + log "写入环境变量文件 ${ENV_FILE}" + write_env_file + fi if confirm "生成并安装 systemd unit?" "Y" "${AUTO_INSTALL_SYSTEMD}"; then render_systemd_unit "Agent Service" "backend.apps.agent_service:app" "8000" "1" "1024M" "/etc/systemd/system/bigtime-agent.service" @@ -568,20 +592,50 @@ main() { if confirm "生成并安装 nginx 配置?" "Y" "${AUTO_INSTALL_NGINX}"; then local use_tls="no" if confirm "使用 HTTPS/Let's Encrypt 证书路径?" "N" "${AUTO_USE_TLS}"; then - use_tls="yes" SSL_CERT_PATH="${SSL_CERT_PATH:-$(ask_required 'SSL 证书 fullchain.pem 路径' "/etc/letsencrypt/live/${DOMAIN}/fullchain.pem")}" SSL_KEY_PATH="${SSL_KEY_PATH:-$(ask_required 'SSL 私钥 privkey.pem 路径' "/etc/letsencrypt/live/${DOMAIN}/privkey.pem")}" - [[ -f "${SSL_CERT_PATH}" ]] || warn "证书文件当前不存在: ${SSL_CERT_PATH}" - [[ -f "${SSL_KEY_PATH}" ]] || warn "私钥文件当前不存在: ${SSL_KEY_PATH}" + + local ssl_err=0 + [[ -f "${SSL_CERT_PATH}" ]] || { warn "SSL 证书文件不存在: ${SSL_CERT_PATH}"; ssl_err=1; } + [[ -f "${SSL_KEY_PATH}" ]] || { warn "SSL 私钥文件不存在: ${SSL_KEY_PATH}"; ssl_err=1; } + [[ -f "/etc/letsencrypt/options-ssl-nginx.conf" ]] || { warn "缺失 /etc/letsencrypt/options-ssl-nginx.conf,请检查 certbot 配置"; ssl_err=1; } + [[ -f "/etc/letsencrypt/ssl-dhparams.pem" ]] || { warn "缺失 /etc/letsencrypt/ssl-dhparams.pem,请检查 certbot 配置"; ssl_err=1; } + + if [[ ${ssl_err} -eq 0 ]]; then + use_tls="yes" + else + warn "由于 SSL 关键文件缺失,将回退至 HTTP 模式,以确保 Nginx 能通过配置检查。" + use_tls="no" + fi else SSL_CERT_PATH="" SSL_KEY_PATH="" fi NGINX_TARGET="/etc/nginx/conf.d/bigtime.conf" render_nginx_conf "${NGINX_TARGET}" "${use_tls}" - if confirm "立即执行 nginx -t 并 reload?" "Y" "${AUTO_RELOAD_NGINX}"; then - sudo nginx -t - sudo systemctl reload nginx + if confirm "立即执行 nginx -t 并生效配置?" "Y" "${AUTO_RELOAD_NGINX}"; then + log "正在验证 Nginx 配置..." + if ! sudo nginx -t; then + fail "Nginx 配置检查失败!请根据上方报错信息调整。常见的错误包括:80/443 端口被占用,或 server_name 冲突。" + fi + + if systemctl is-active --quiet nginx; then + log "Nginx 正在运行,执行 reload..." + sudo systemctl reload nginx + else + log "Nginx 未运行,尝试启动..." + sudo systemctl enable --now nginx + fi + + # 关键修复:确保 nginx 用户对 /root 路径有 x 权限 + if [[ "${APP_DIR}" == /root/* ]]; then + log "检测到应用部署在 /root 下,正在修复父目录访问权限..." + sudo chmod o+x /root 2>/dev/null || true + sudo chmod o+x "$(dirname "${APP_DIR}")" 2>/dev/null || true + sudo chmod -R o+rX "${APP_DIR}" + fi + + log "Nginx 配置已生效。" fi fi diff --git a/deploy/production-deployment.md b/deploy/production-deployment.md index 3e5cf53..b73df6f 100644 --- a/deploy/production-deployment.md +++ b/deploy/production-deployment.md @@ -42,7 +42,7 @@ Recommended frontend mode: ```bash cd frontend -npm ci +npm install npm run build ``` diff --git a/deploy/systemd/README.md b/deploy/systemd/README.md index 5b1a6e9..c1f42f7 100644 --- a/deploy/systemd/README.md +++ b/deploy/systemd/README.md @@ -40,7 +40,7 @@ sudo systemctl enable --now bigtime-runtime.service Recommended production frontend mode: -- build with `cd frontend && npm ci && npm run build` +- build with `cd frontend && npm install && npm run build` - let `nginx` serve `frontend/dist` directly The repository also contains `backend.apps.frontend_service`, but for diff --git a/shared/client/news_client.py b/shared/client/news_client.py index 96c9481..16f030a 100644 --- a/shared/client/news_client.py +++ b/shared/client/news_client.py @@ -17,7 +17,11 @@ class NewsServiceClient: self._client: httpx.AsyncClient | None = None async def __aenter__(self) -> "NewsServiceClient": - self._client = httpx.AsyncClient(base_url=self.base_url, timeout=30.0) + self._client = httpx.AsyncClient( + base_url=self.base_url, + timeout=90.0, + limits=httpx.Limits(max_connections=100, max_keepalive_connections=20) + ) return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: diff --git a/shared/client/trading_client.py b/shared/client/trading_client.py index 7879c98..97dfad2 100644 --- a/shared/client/trading_client.py +++ b/shared/client/trading_client.py @@ -21,7 +21,11 @@ class TradingServiceClient: self._client: httpx.AsyncClient | None = None async def __aenter__(self) -> "TradingServiceClient": - self._client = httpx.AsyncClient(base_url=self.base_url, timeout=30.0) + self._client = httpx.AsyncClient( + base_url=self.base_url, + timeout=60.0, + limits=httpx.Limits(max_connections=100, max_keepalive_connections=20) + ) return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: