219 lines
7.9 KiB
Python
219 lines
7.9 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Risk management tools for the risk manager agent."""
|
|
|
|
import json
|
|
from typing import Any, Dict, Iterable, List
|
|
|
|
from agentscope.message import TextBlock
|
|
from agentscope.tool import ToolResponse
|
|
|
|
|
|
def _to_text_response(text: str) -> ToolResponse:
|
|
return ToolResponse(content=[TextBlock(type="text", text=text)])
|
|
|
|
|
|
def _parse_object(payload: Any) -> Dict[str, Any]:
|
|
if payload is None:
|
|
return {}
|
|
if isinstance(payload, dict):
|
|
return payload
|
|
if isinstance(payload, str):
|
|
try:
|
|
parsed = json.loads(payload)
|
|
return parsed if isinstance(parsed, dict) else {}
|
|
except json.JSONDecodeError:
|
|
return {}
|
|
return {}
|
|
|
|
|
|
def _parse_prices(payload: Any) -> Dict[str, float]:
|
|
parsed = _parse_object(payload)
|
|
prices = {}
|
|
for ticker, value in parsed.items():
|
|
try:
|
|
prices[str(ticker)] = float(value)
|
|
except (TypeError, ValueError):
|
|
continue
|
|
return prices
|
|
|
|
|
|
def _iter_positions(
|
|
portfolio: Dict[str, Any],
|
|
prices: Dict[str, float],
|
|
) -> Iterable[Dict[str, Any]]:
|
|
positions = portfolio.get("positions", {})
|
|
for ticker, raw_position in positions.items():
|
|
if not isinstance(raw_position, dict):
|
|
continue
|
|
price = prices.get(ticker, 0.0)
|
|
long_qty = int(raw_position.get("long", 0) or 0)
|
|
short_qty = int(raw_position.get("short", 0) or 0)
|
|
long_value = long_qty * price
|
|
short_value = short_qty * price
|
|
net_value = long_value - short_value
|
|
gross_value = long_value + short_value
|
|
yield {
|
|
"ticker": ticker,
|
|
"price": price,
|
|
"long_qty": long_qty,
|
|
"short_qty": short_qty,
|
|
"long_value": long_value,
|
|
"short_value": short_value,
|
|
"net_value": net_value,
|
|
"gross_value": gross_value,
|
|
}
|
|
|
|
|
|
def _portfolio_equity(portfolio: Dict[str, Any], prices: Dict[str, float]) -> float:
|
|
cash = float(portfolio.get("cash", 0.0) or 0.0)
|
|
margin_used = float(portfolio.get("margin_used", 0.0) or 0.0)
|
|
total = cash + margin_used
|
|
for position in _iter_positions(portfolio, prices):
|
|
total += position["net_value"]
|
|
return total
|
|
|
|
|
|
def assess_position_concentration(
|
|
portfolio: Dict[str, Any] | str,
|
|
current_prices: Dict[str, float] | str,
|
|
) -> ToolResponse:
|
|
"""
|
|
Assess single-name concentration and gross exposure in the current portfolio.
|
|
|
|
Args:
|
|
portfolio: Portfolio state with cash, positions, and margin fields.
|
|
current_prices: Current price map by ticker.
|
|
"""
|
|
portfolio_obj = _parse_object(portfolio)
|
|
prices = _parse_prices(current_prices)
|
|
equity = _portfolio_equity(portfolio_obj, prices)
|
|
|
|
if equity <= 0:
|
|
return _to_text_response("Unable to assess concentration: portfolio equity is non-positive.")
|
|
|
|
exposures: List[Dict[str, Any]] = sorted(
|
|
_iter_positions(portfolio_obj, prices),
|
|
key=lambda item: abs(item["net_value"]),
|
|
reverse=True,
|
|
)
|
|
|
|
if not exposures:
|
|
return _to_text_response(
|
|
"No open positions. Concentration risk is low because the portfolio is fully in cash."
|
|
)
|
|
|
|
lines = ["=== Position Concentration Assessment ==="]
|
|
gross_exposure = sum(item["gross_value"] for item in exposures)
|
|
net_exposure = sum(item["net_value"] for item in exposures)
|
|
lines.append(f"Portfolio equity: ${equity:,.2f}")
|
|
lines.append(f"Gross exposure: ${gross_exposure:,.2f} ({gross_exposure / equity:.1%} of equity)")
|
|
lines.append(f"Net exposure: ${net_exposure:,.2f} ({net_exposure / equity:.1%} of equity)")
|
|
lines.append("Largest positions by net exposure:")
|
|
|
|
for item in exposures[:5]:
|
|
weight = item["net_value"] / equity
|
|
gross_weight = item["gross_value"] / equity
|
|
direction = "NET LONG" if item["net_value"] >= 0 else "NET SHORT"
|
|
lines.append(
|
|
f"- {item['ticker']}: {direction}, net ${item['net_value']:,.2f} ({weight:.1%}), "
|
|
f"gross ${item['gross_value']:,.2f} ({gross_weight:.1%})"
|
|
)
|
|
|
|
top_weight = abs(exposures[0]["net_value"]) / equity
|
|
if top_weight >= 0.30:
|
|
lines.append("Risk flag: concentration is HIGH because the largest single-name exposure exceeds 30% of equity.")
|
|
elif top_weight >= 0.20:
|
|
lines.append("Risk flag: concentration is MODERATE because the largest single-name exposure exceeds 20% of equity.")
|
|
else:
|
|
lines.append("Risk flag: concentration is currently contained at the single-name level.")
|
|
|
|
return _to_text_response("\n".join(lines))
|
|
|
|
|
|
def assess_margin_and_liquidity(
|
|
portfolio: Dict[str, Any] | str,
|
|
current_prices: Dict[str, float] | str,
|
|
) -> ToolResponse:
|
|
"""
|
|
Assess available cash, margin usage, and short exposure pressure.
|
|
|
|
Args:
|
|
portfolio: Portfolio state with cash, positions, and margin fields.
|
|
current_prices: Current price map by ticker.
|
|
"""
|
|
portfolio_obj = _parse_object(portfolio)
|
|
prices = _parse_prices(current_prices)
|
|
equity = _portfolio_equity(portfolio_obj, prices)
|
|
cash = float(portfolio_obj.get("cash", 0.0) or 0.0)
|
|
margin_used = float(portfolio_obj.get("margin_used", 0.0) or 0.0)
|
|
margin_requirement = float(portfolio_obj.get("margin_requirement", 0.0) or 0.0)
|
|
|
|
short_exposure = sum(item["short_value"] for item in _iter_positions(portfolio_obj, prices))
|
|
margin_buffer = cash - margin_used
|
|
|
|
lines = ["=== Margin And Liquidity Assessment ==="]
|
|
lines.append(f"Portfolio equity: ${equity:,.2f}")
|
|
lines.append(f"Cash available: ${cash:,.2f}")
|
|
lines.append(f"Margin used: ${margin_used:,.2f}")
|
|
lines.append(f"Margin requirement: {margin_requirement:.1%}")
|
|
lines.append(f"Short exposure: ${short_exposure:,.2f}")
|
|
lines.append(f"Margin buffer (cash - used): ${margin_buffer:,.2f}")
|
|
|
|
if equity > 0:
|
|
lines.append(f"Margin used / equity: {margin_used / equity:.1%}")
|
|
lines.append(f"Short exposure / equity: {short_exposure / equity:.1%}")
|
|
|
|
if margin_buffer < 0:
|
|
lines.append("Risk flag: HIGH. Margin usage exceeds available cash buffer.")
|
|
elif equity > 0 and margin_used / equity > 0.35:
|
|
lines.append("Risk flag: MODERATE to HIGH. Margin usage is above 35% of equity.")
|
|
else:
|
|
lines.append("Risk flag: margin pressure is currently manageable.")
|
|
|
|
return _to_text_response("\n".join(lines))
|
|
|
|
|
|
def assess_volatility_exposure(
|
|
tickers: List[str] | str,
|
|
current_date: str | None = None,
|
|
) -> ToolResponse:
|
|
"""
|
|
Assess per-ticker volatility and risk level for the current watchlist.
|
|
|
|
Args:
|
|
tickers: List of stock tickers or JSON list string.
|
|
current_date: Analysis date in YYYY-MM-DD format.
|
|
"""
|
|
from datetime import datetime, timedelta
|
|
|
|
from backend.tools.analysis_tools import _parse_tickers, _resolved_date
|
|
from backend.tools.data_tools import get_prices, prices_to_df
|
|
from backend.tools.technical_signals import StockTechnicalAnalyzer
|
|
|
|
tickers_list = _parse_tickers(tickers)
|
|
current_date = _resolved_date(current_date)
|
|
end_dt = datetime.strptime(current_date, "%Y-%m-%d")
|
|
start_date = (end_dt - timedelta(days=90)).strftime("%Y-%m-%d")
|
|
analyzer = StockTechnicalAnalyzer()
|
|
lines = [f"=== Volatility Exposure Assessment ({current_date}) ==="]
|
|
|
|
for ticker in tickers_list:
|
|
prices = get_prices(
|
|
ticker=ticker,
|
|
start_date=start_date,
|
|
end_date=current_date,
|
|
)
|
|
if not prices or len(prices) < 5:
|
|
lines.append(f"- {ticker}: insufficient price data")
|
|
continue
|
|
signal = analyzer.analyze(ticker=ticker, df=prices_to_df(prices))
|
|
lines.append(
|
|
f"- {ticker}: annualized volatility {signal.annualized_volatility_pct:.1f}%, "
|
|
f"RSI14 {signal.rsi14:.1f}, trend {signal.trend}, risk level {signal.risk_level}"
|
|
)
|
|
|
|
if len(lines) == 1:
|
|
lines.append("No tickers provided.")
|
|
|
|
return _to_text_response("\n".join(lines))
|