# -*- coding: utf-8 -*- """Risk management tools for the risk manager agent.""" import json from typing import Any, Dict, Iterable, List from agentscope.message import TextBlock from agentscope.tool import ToolResponse def _to_text_response(text: str) -> ToolResponse: return ToolResponse(content=[TextBlock(type="text", text=text)]) def _parse_object(payload: Any) -> Dict[str, Any]: if payload is None: return {} if isinstance(payload, dict): return payload if isinstance(payload, str): try: parsed = json.loads(payload) return parsed if isinstance(parsed, dict) else {} except json.JSONDecodeError: return {} return {} def _parse_prices(payload: Any) -> Dict[str, float]: parsed = _parse_object(payload) prices = {} for ticker, value in parsed.items(): try: prices[str(ticker)] = float(value) except (TypeError, ValueError): continue return prices def _iter_positions( portfolio: Dict[str, Any], prices: Dict[str, float], ) -> Iterable[Dict[str, Any]]: positions = portfolio.get("positions", {}) for ticker, raw_position in positions.items(): if not isinstance(raw_position, dict): continue price = prices.get(ticker, 0.0) long_qty = int(raw_position.get("long", 0) or 0) short_qty = int(raw_position.get("short", 0) or 0) long_value = long_qty * price short_value = short_qty * price net_value = long_value - short_value gross_value = long_value + short_value yield { "ticker": ticker, "price": price, "long_qty": long_qty, "short_qty": short_qty, "long_value": long_value, "short_value": short_value, "net_value": net_value, "gross_value": gross_value, } def _portfolio_equity(portfolio: Dict[str, Any], prices: Dict[str, float]) -> float: cash = float(portfolio.get("cash", 0.0) or 0.0) margin_used = float(portfolio.get("margin_used", 0.0) or 0.0) total = cash + margin_used for position in _iter_positions(portfolio, prices): total += position["net_value"] return total def assess_position_concentration( portfolio: Dict[str, Any] | str, current_prices: Dict[str, float] | str, ) -> ToolResponse: """ Assess single-name concentration and gross exposure in the current portfolio. Args: portfolio: Portfolio state with cash, positions, and margin fields. current_prices: Current price map by ticker. """ portfolio_obj = _parse_object(portfolio) prices = _parse_prices(current_prices) equity = _portfolio_equity(portfolio_obj, prices) if equity <= 0: return _to_text_response("Unable to assess concentration: portfolio equity is non-positive.") exposures: List[Dict[str, Any]] = sorted( _iter_positions(portfolio_obj, prices), key=lambda item: abs(item["net_value"]), reverse=True, ) if not exposures: return _to_text_response( "No open positions. Concentration risk is low because the portfolio is fully in cash." ) lines = ["=== Position Concentration Assessment ==="] gross_exposure = sum(item["gross_value"] for item in exposures) net_exposure = sum(item["net_value"] for item in exposures) lines.append(f"Portfolio equity: ${equity:,.2f}") lines.append(f"Gross exposure: ${gross_exposure:,.2f} ({gross_exposure / equity:.1%} of equity)") lines.append(f"Net exposure: ${net_exposure:,.2f} ({net_exposure / equity:.1%} of equity)") lines.append("Largest positions by net exposure:") for item in exposures[:5]: weight = item["net_value"] / equity gross_weight = item["gross_value"] / equity direction = "NET LONG" if item["net_value"] >= 0 else "NET SHORT" lines.append( f"- {item['ticker']}: {direction}, net ${item['net_value']:,.2f} ({weight:.1%}), " f"gross ${item['gross_value']:,.2f} ({gross_weight:.1%})" ) top_weight = abs(exposures[0]["net_value"]) / equity if top_weight >= 0.30: lines.append("Risk flag: concentration is HIGH because the largest single-name exposure exceeds 30% of equity.") elif top_weight >= 0.20: lines.append("Risk flag: concentration is MODERATE because the largest single-name exposure exceeds 20% of equity.") else: lines.append("Risk flag: concentration is currently contained at the single-name level.") return _to_text_response("\n".join(lines)) def assess_margin_and_liquidity( portfolio: Dict[str, Any] | str, current_prices: Dict[str, float] | str, ) -> ToolResponse: """ Assess available cash, margin usage, and short exposure pressure. Args: portfolio: Portfolio state with cash, positions, and margin fields. current_prices: Current price map by ticker. """ portfolio_obj = _parse_object(portfolio) prices = _parse_prices(current_prices) equity = _portfolio_equity(portfolio_obj, prices) cash = float(portfolio_obj.get("cash", 0.0) or 0.0) margin_used = float(portfolio_obj.get("margin_used", 0.0) or 0.0) margin_requirement = float(portfolio_obj.get("margin_requirement", 0.0) or 0.0) short_exposure = sum(item["short_value"] for item in _iter_positions(portfolio_obj, prices)) margin_buffer = cash - margin_used lines = ["=== Margin And Liquidity Assessment ==="] lines.append(f"Portfolio equity: ${equity:,.2f}") lines.append(f"Cash available: ${cash:,.2f}") lines.append(f"Margin used: ${margin_used:,.2f}") lines.append(f"Margin requirement: {margin_requirement:.1%}") lines.append(f"Short exposure: ${short_exposure:,.2f}") lines.append(f"Margin buffer (cash - used): ${margin_buffer:,.2f}") if equity > 0: lines.append(f"Margin used / equity: {margin_used / equity:.1%}") lines.append(f"Short exposure / equity: {short_exposure / equity:.1%}") if margin_buffer < 0: lines.append("Risk flag: HIGH. Margin usage exceeds available cash buffer.") elif equity > 0 and margin_used / equity > 0.35: lines.append("Risk flag: MODERATE to HIGH. Margin usage is above 35% of equity.") else: lines.append("Risk flag: margin pressure is currently manageable.") return _to_text_response("\n".join(lines)) def assess_volatility_exposure( tickers: List[str] | str, current_date: str | None = None, ) -> ToolResponse: """ Assess per-ticker volatility and risk level for the current watchlist. Args: tickers: List of stock tickers or JSON list string. current_date: Analysis date in YYYY-MM-DD format. """ from datetime import datetime, timedelta from backend.tools.analysis_tools import _parse_tickers, _resolved_date from backend.tools.data_tools import get_prices, prices_to_df from backend.tools.technical_signals import StockTechnicalAnalyzer tickers_list = _parse_tickers(tickers) current_date = _resolved_date(current_date) end_dt = datetime.strptime(current_date, "%Y-%m-%d") start_date = (end_dt - timedelta(days=90)).strftime("%Y-%m-%d") analyzer = StockTechnicalAnalyzer() lines = [f"=== Volatility Exposure Assessment ({current_date}) ==="] for ticker in tickers_list: prices = get_prices( ticker=ticker, start_date=start_date, end_date=current_date, ) if not prices or len(prices) < 5: lines.append(f"- {ticker}: insufficient price data") continue signal = analyzer.analyze(ticker=ticker, df=prices_to_df(prices)) lines.append( f"- {ticker}: annualized volatility {signal.annualized_volatility_pct:.1f}%, " f"RSI14 {signal.rsi14:.1f}, trend {signal.trend}, risk level {signal.risk_level}" ) if len(lines) == 1: lines.append("No tickers provided.") return _to_text_response("\n".join(lines))