Files
evotraders/shared/client/trading_client.py

176 lines
5.4 KiB
Python

# -*- coding: utf-8 -*-
"""Trading service client for market data operations."""
import httpx
from shared.schema.price import PriceResponse
from shared.schema.financial import FinancialMetricsResponse, LineItemResponse
from shared.schema.market import InsiderTradeResponse
class TradingServiceClient:
"""Async client for the Trading Service API."""
def __init__(self, base_url: str = "http://localhost:8001"):
"""Initialize the client with a base URL.
Args:
base_url: Base URL for the trading service API.
"""
self.base_url = base_url.rstrip("/")
self._client: httpx.AsyncClient | None = None
async def __aenter__(self) -> "TradingServiceClient":
self._client = httpx.AsyncClient(
base_url=self.base_url,
timeout=60.0,
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20)
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
if self._client:
await self._client.aclose()
async def get_prices(
self,
ticker: str,
start_date: str | None = None,
end_date: str | None = None,
) -> PriceResponse:
"""Get price data for a ticker.
Args:
ticker: Stock ticker symbol.
start_date: Start date (YYYY-MM-DD).
end_date: End date (YYYY-MM-DD).
Returns:
PriceResponse with price data.
"""
params = {"ticker": ticker}
if start_date:
params["start_date"] = start_date
if end_date:
params["end_date"] = end_date
response = await self._client.get("/api/prices", params=params)
response.raise_for_status()
return PriceResponse.model_validate(response.json())
async def get_news(
self,
ticker: str,
start_date: str | None = None,
end_date: str | None = None,
) -> dict:
"""Get news for a ticker.
Args:
ticker: Stock ticker symbol.
start_date: Start date (YYYY-MM-DD).
end_date: End date (YYYY-MM-DD).
Returns:
Dictionary with news data.
"""
params = {"ticker": ticker}
if start_date:
params["start_date"] = start_date
if end_date:
params["end_date"] = end_date
response = await self._client.get("/api/news", params=params)
response.raise_for_status()
return response.json()
async def get_financials(
self,
ticker: str,
period: str | None = None,
limit: int | None = None,
) -> FinancialMetricsResponse:
"""Get financial metrics for a ticker.
Args:
ticker: Stock ticker symbol.
period: Reporting period (e.g., "annual", "quarterly").
limit: Maximum number of records to return.
Returns:
FinancialMetricsResponse with financial data.
"""
params = {"ticker": ticker}
if period:
params["period"] = period
if limit:
params["limit"] = limit
response = await self._client.get("/api/financials", params=params)
response.raise_for_status()
return FinancialMetricsResponse.model_validate(response.json())
async def get_insider_trades(
self,
ticker: str,
end_date: str | None = None,
start_date: str | None = None,
limit: int | None = None,
) -> InsiderTradeResponse:
"""Get insider trades for a ticker.
Args:
ticker: Stock ticker symbol.
limit: Maximum number of records to return.
Returns:
InsiderTradeResponse with insider trade data.
"""
params = {"ticker": ticker}
if start_date:
params["start_date"] = start_date
if end_date:
params["end_date"] = end_date
if limit:
params["limit"] = limit
response = await self._client.get("/api/insider-trades", params=params)
response.raise_for_status()
return InsiderTradeResponse.model_validate(response.json())
async def get_market_status(self) -> dict:
"""Get current market status.
Returns:
Dictionary with market status information.
"""
response = await self._client.get("/api/market/status")
response.raise_for_status()
return response.json()
async def get_market_cap(self, ticker: str, end_date: str) -> dict:
"""Get market cap for a ticker/date."""
response = await self._client.get(
"/api/market-cap",
params={"ticker": ticker, "end_date": end_date},
)
response.raise_for_status()
return response.json()
async def get_line_items(
self,
ticker: str,
line_items: list[str],
end_date: str,
period: str = "ttm",
limit: int = 10,
) -> LineItemResponse:
"""Get line-item search results for a ticker/date."""
params: list[tuple[str, str | int]] = [
("ticker", ticker),
("end_date", end_date),
("period", period),
("limit", limit),
]
for item in line_items:
params.append(("line_items", item))
response = await self._client.get("/api/line-items", params=params)
response.raise_for_status()
return LineItemResponse.model_validate(response.json())