From dfc8fda18779945ed387a9f7ecfaaf96d091f5bc Mon Sep 17 00:00:00 2001 From: cillin Date: Tue, 7 Apr 2026 14:59:41 +0800 Subject: [PATCH] perf: asynchronize and parallelize analysis tools to prevent main loop blocking --- backend/tools/analysis_tools.py | 156 ++++++++++++++++++-------------- 1 file changed, 89 insertions(+), 67 deletions(-) diff --git a/backend/tools/analysis_tools.py b/backend/tools/analysis_tools.py index 52d825c..c5fb772 100644 --- a/backend/tools/analysis_tools.py +++ b/backend/tools/analysis_tools.py @@ -103,18 +103,28 @@ def _safe_float(value, default=0.0) -> float: def safe(func): - """Decorator to catch exceptions in tool functions.""" + """Decorator to catch exceptions in both sync and async tool functions.""" - @wraps(func) - def wrapper(*args, **kwargs): - try: - return func(*args, **kwargs) - except Exception as e: - error_msg = f"Error in {func.__name__}: {str(e)}" - logger.error(f"{error_msg}\n{traceback.format_exc()}") - return _to_text_response(f"[ERROR] {error_msg}") - - return wrapper + if asyncio.iscoroutinefunction(func): + @wraps(func) + async def async_wrapper(*args, **kwargs): + try: + return await func(*args, **kwargs) + except Exception as e: + error_msg = f"Error in {func.__name__}: {str(e)}" + logger.error(f"{error_msg}\n{traceback.format_exc()}") + return _to_text_response(f"[ERROR] {error_msg}") + return async_wrapper + else: + @wraps(func) + def sync_wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except Exception as e: + error_msg = f"Error in {func.__name__}: {str(e)}" + logger.error(f"{error_msg}\n{traceback.format_exc()}") + return _to_text_response(f"[ERROR] {error_msg}") + return sync_wrapper def _fmt(val, fmt=".2f", suffix="") -> str: @@ -141,7 +151,7 @@ def _resolved_date(current_date: Optional[str]) -> str: @safe -def analyze_efficiency_ratios( +async def analyze_efficiency_ratios( tickers: Optional[List[str]] = None, current_date: Optional[str] = None, ) -> ToolResponse: @@ -163,21 +173,26 @@ def analyze_efficiency_ratios( tickers = _parse_tickers(tickers) lines = [f"=== Efficiency Ratios Analysis ({current_date}) ===\n"] - for ticker in tickers: - metrics = get_financial_metrics(ticker=ticker, end_date=current_date) - if not metrics: - lines.append(f"{ticker}: No data available\n") - continue + async def _fetch_one(ticker): + try: + metrics = await asyncio.to_thread(get_financial_metrics, ticker=ticker, end_date=current_date) + if not metrics: + return f"{ticker}: No data available\n" - m = metrics[0] - lines.append(f"{ticker}:") - lines.append(f" Asset Turnover: {_fmt(m.asset_turnover)}") - lines.append(f" Inventory Turnover: {_fmt(m.inventory_turnover)}") - lines.append(f" Receivables Turnover: {_fmt(m.receivables_turnover)}") - lines.append( - f" Working Capital Turnover: {_fmt(m.working_capital_turnover)}", - ) - lines.append("") + m = metrics[0] + ticker_lines = [ + f"{ticker}:", + f" Asset Turnover: {_fmt(m.asset_turnover)}", + f" Inventory Turnover: {_fmt(m.inventory_turnover)}", + f" Receivables Turnover: {_fmt(m.receivables_turnover)}", + f" Working Capital Turnover: {_fmt(m.working_capital_turnover)}\n", + ] + return "\n".join(ticker_lines) + except Exception as e: + return f"{ticker}: Error - {str(e)}\n" + + results = await asyncio.gather(*[_fetch_one(t) for t in tickers]) + lines.extend(results) return _to_text_response("\n".join(lines)) @@ -310,7 +325,7 @@ def analyze_financial_health( @safe -def analyze_valuation_ratios( +async def analyze_valuation_ratios( tickers: Optional[List[str]] = None, current_date: Optional[str] = None, ) -> ToolResponse: @@ -332,24 +347,31 @@ def analyze_valuation_ratios( tickers = _parse_tickers(tickers) lines = [f"=== Valuation Ratios Analysis ({current_date}) ===\n"] - for ticker in tickers: - metrics = get_financial_metrics(ticker=ticker, end_date=current_date) - if not metrics: - lines.append(f"{ticker}: No data available\n") - continue + async def _fetch_one(ticker): + try: + metrics = await asyncio.to_thread(get_financial_metrics, ticker=ticker, end_date=current_date) + if not metrics: + return f"{ticker}: No data available\n" - m = metrics[0] - lines.append(f"{ticker}:") - lines.append(f" P/E Ratio: {_fmt(m.price_to_earnings_ratio)}") - lines.append(f" P/B Ratio: {_fmt(m.price_to_book_ratio)}") - lines.append(f" P/S Ratio: {_fmt(m.price_to_sales_ratio)}") - lines.append("") + m = metrics[0] + ticker_lines = [ + f"{ticker}:", + f" P/E Ratio: {_fmt(m.price_to_earnings_ratio)}", + f" P/B Ratio: {_fmt(m.price_to_book_ratio)}", + f" P/S Ratio: {_fmt(m.price_to_sales_ratio)}\n", + ] + return "\n".join(ticker_lines) + except Exception as e: + return f"{ticker}: Error - {str(e)}\n" + + results = await asyncio.gather(*[_fetch_one(t) for t in tickers]) + lines.extend(results) return _to_text_response("\n".join(lines)) @safe -def get_financial_metrics_tool( +async def get_financial_metrics_tool( tickers: Optional[List[str]] = None, current_date: Optional[str] = None, period: str = "ttm", @@ -374,35 +396,35 @@ def get_financial_metrics_tool( f"=== Comprehensive Financial Metrics ({current_date}, {period}) ===\n", ] - for ticker in tickers: - metrics = get_financial_metrics( - ticker=ticker, - end_date=current_date, - period=period, - ) - if not metrics: - lines.append(f"{ticker}: No data available\n") - continue + async def _fetch_one(ticker): + try: + # Offload synchronous data fetching to thread to keep loop snappy + metrics = await asyncio.to_thread( + get_financial_metrics, + ticker=ticker, + end_date=current_date, + period=period, + ) + if not metrics: + return f"{ticker}: No data available\n" - m = metrics[0] - lines.append(f"{ticker}:") - lines.append(f" Market Cap: ${_fmt(m.market_cap, ',.0f')}") - lines.append( - f" P/E: {_fmt(m.price_to_earnings_ratio)} | P/B: {_fmt(m.price_to_book_ratio)} | P/S: {_fmt(m.price_to_sales_ratio)}", - ) - lines.append( - f" ROE: {_fmt(m.return_on_equity, '.1%')} | Net Margin: {_fmt(m.net_margin, '.1%')}", - ) - lines.append( - f" Revenue Growth: {_fmt(m.revenue_growth, '.1%')} | Earnings Growth: {_fmt(m.earnings_growth, '.1%')}", - ) - lines.append( - f" Current Ratio: {_fmt(m.current_ratio)} | D/E: {_fmt(m.debt_to_equity)}", - ) - lines.append( - f" EPS: ${_fmt(m.earnings_per_share)} | FCF/Share: ${_fmt(m.free_cash_flow_per_share)}", - ) - lines.append("") + m = metrics[0] + ticker_lines = [ + f"{ticker}:", + f" Market Cap: ${_fmt(m.market_cap, ',.0f')}", + f" P/E: {_fmt(m.price_to_earnings_ratio)} | P/B: {_fmt(m.price_to_book_ratio)} | P/S: {_fmt(m.price_to_sales_ratio)}", + f" ROE: {_fmt(m.return_on_equity, '.1%')} | Net Margin: {_fmt(m.net_margin, '.1%')}", + f" Revenue Growth: {_fmt(m.revenue_growth, '.1%')} | Earnings Growth: {_fmt(m.earnings_growth, '.1%')}", + f" Current Ratio: {_fmt(m.current_ratio)} | D/E: {_fmt(m.debt_to_equity)}", + f" EPS: ${_fmt(m.earnings_per_share)} | FCF/Share: ${_fmt(m.free_cash_flow_per_share)}\n", + ] + return "\n".join(ticker_lines) + except Exception as e: + return f"{ticker}: Error fetching data - {str(e)}\n" + + # Parallelize data retrieval for all tickers + results = await asyncio.gather(*[_fetch_one(t) for t in tickers]) + lines.extend(results) return _to_text_response("\n".join(lines))