Files
evotraders/backend/data/provider_router.py
cillin 3448667b79 feat: 微服务架构拆分和前后端优化
后端:
- 拆分出 agent_service, runtime_service, trading_service, news_service
- Gateway 模块化拆分 (gateway_*.py)
- 添加 domains/ 领域层
- 新增 control_client, runtime_client
- 更新 start-dev.sh 支持 split 服务模式

前端:
- 完善 API 服务层 (newsApi, tradingApi)
- 更新 vite.config.js
- Explain 组件优化

测试:
- 添加多个服务 app 测试

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-23 17:45:39 +08:00

911 lines
30 KiB
Python

# -*- coding: utf-8 -*-
"""Unified data provider router with fallback support."""
import datetime
import logging
from pathlib import Path
from typing import Callable, Optional
import finnhub
import pandas as pd
import yfinance as yf
from backend.config.data_config import DataSource, get_data_sources
from shared.schema import (
CompanyFactsResponse,
CompanyNews,
CompanyNewsResponse,
FinancialMetrics,
FinancialMetricsResponse,
InsiderTrade,
InsiderTradeResponse,
LineItem,
LineItemResponse,
Price,
PriceResponse,
)
logger = logging.getLogger(__name__)
_DATA_DIR = Path(__file__).parent / "ret_data"
def _format_provider_error(exc: Exception) -> str:
"""Condense common provider failures into short, readable messages."""
message = str(exc).strip().replace("\n", " ")
if "429" in message:
return "rate limit reached"
if "402" in message:
return "insufficient credits"
if "422" in message or "Missing parameters" in message:
return "invalid request parameters"
if "Quote not found" in message:
return "quote not found"
return message
def _has_valid_ticker(ticker: str) -> bool:
"""Return whether the normalized ticker is non-empty."""
return bool((ticker or "").strip())
class DataProviderRouter:
"""Route data requests across configured providers with fallbacks."""
def __init__(self):
self.sources = get_data_sources()
self._usage = {
"preferred": list(self.sources),
"last_success": {},
}
self._listeners: list[Callable[[dict], None]] = []
def price_sources(self) -> list[DataSource]:
"""Price lookup order, always allowing local CSV fallback."""
return self.sources
def api_sources(self) -> list[DataSource]:
"""Providers that can serve network-backed data."""
return [source for source in self.sources if source != "local_csv"]
def get_prices(
self,
ticker: str,
start_date: str,
end_date: str,
) -> tuple[list[Price], DataSource]:
"""Fetch prices using preferred providers with fallback."""
if not _has_valid_ticker(ticker):
return [], "local_csv"
last_error: Optional[Exception] = None
for source in self.price_sources():
try:
if source == "finnhub":
prices = _fetch_finnhub_prices(ticker, start_date, end_date)
self._record_success("prices", source)
return prices, source
if source == "financial_datasets":
prices = _fetch_fd_prices(ticker, start_date, end_date)
self._record_success("prices", source)
return prices, source
if source == "yfinance":
prices = _fetch_yfinance_prices(ticker, start_date, end_date)
self._record_success("prices", source)
return prices, source
prices = _fetch_local_prices(ticker, start_date, end_date)
if prices:
self._record_success("prices", source)
return prices, source
except Exception as exc:
last_error = exc
logger.warning(
"Price source %s failed for %s: %s",
source,
ticker,
_format_provider_error(exc),
)
if last_error:
raise last_error
return [], "local_csv"
def get_financial_metrics(
self,
ticker: str,
end_date: str,
period: str = "ttm",
limit: int = 10,
) -> tuple[list[FinancialMetrics], DataSource]:
"""Fetch financial metrics with API provider fallback."""
if not _has_valid_ticker(ticker):
return [], "local_csv"
last_error: Optional[Exception] = None
for source in self.api_sources():
try:
if source == "finnhub":
metrics = _fetch_finnhub_financial_metrics(
ticker,
end_date,
period,
)
self._record_success("financial_metrics", source)
return metrics, source
if source == "yfinance":
metrics = _fetch_yfinance_financial_metrics(
ticker,
end_date,
period,
)
self._record_success("financial_metrics", source)
return metrics, source
metrics = _fetch_fd_financial_metrics(
ticker,
end_date,
period,
limit,
)
self._record_success("financial_metrics", source)
return metrics, source
except Exception as exc:
last_error = exc
logger.warning(
"Financial metrics source %s failed for %s: %s",
source,
ticker,
_format_provider_error(exc),
)
if last_error:
raise last_error
return [], "local_csv"
def search_line_items(
self,
ticker: str,
line_items: list[str],
end_date: str,
period: str = "ttm",
limit: int = 10,
) -> list[LineItem]:
"""Line items are only supported via Financial Datasets."""
if not _has_valid_ticker(ticker):
return []
if "financial_datasets" not in self.api_sources():
return []
try:
results = _fetch_fd_line_items(
ticker=ticker,
line_items=line_items,
end_date=end_date,
period=period,
limit=limit,
)
self._record_success("line_items", "financial_datasets")
return results
except Exception as exc:
logger.warning(
"Line items source failed for %s: %s",
ticker,
_format_provider_error(exc),
)
return []
def get_insider_trades(
self,
ticker: str,
end_date: str,
start_date: Optional[str] = None,
limit: int = 1000,
) -> tuple[list[InsiderTrade], DataSource]:
"""Fetch insider trades with provider fallback."""
if not _has_valid_ticker(ticker):
return [], "local_csv"
last_error: Optional[Exception] = None
for source in self.api_sources():
try:
if source == "finnhub":
trades = _fetch_finnhub_insider_trades(
ticker,
start_date,
end_date,
limit,
)
self._record_success("insider_trades", source)
return trades, source
trades = _fetch_fd_insider_trades(
ticker,
start_date,
end_date,
limit,
)
self._record_success("insider_trades", source)
return trades, source
except Exception as exc:
last_error = exc
logger.warning(
"Insider trades source %s failed for %s: %s",
source,
ticker,
_format_provider_error(exc),
)
if last_error:
raise last_error
return [], "local_csv"
def get_company_news(
self,
ticker: str,
end_date: str,
start_date: Optional[str] = None,
limit: int = 1000,
) -> tuple[list[CompanyNews], DataSource]:
"""Fetch company news with provider fallback."""
if not _has_valid_ticker(ticker):
return [], "local_csv"
last_error: Optional[Exception] = None
for source in self.api_sources():
try:
if source == "finnhub":
news = _fetch_finnhub_company_news(
ticker,
start_date,
end_date,
limit,
)
self._record_success("company_news", source)
return news, source
if source == "yfinance":
news = _fetch_yfinance_company_news(
ticker,
start_date,
end_date,
limit,
)
self._record_success("company_news", source)
return news, source
news = _fetch_fd_company_news(
ticker,
start_date,
end_date,
limit,
)
self._record_success("company_news", source)
return news, source
except Exception as exc:
last_error = exc
logger.warning(
"Company news source %s failed for %s: %s",
source,
ticker,
_format_provider_error(exc),
)
if last_error:
raise last_error
return [], "local_csv"
def get_market_cap(
self,
ticker: str,
end_date: str,
metrics_lookup,
) -> tuple[Optional[float], DataSource]:
"""Fetch market cap using facts API or financial metrics fallback."""
if not _has_valid_ticker(ticker):
return None, "local_csv"
today = datetime.datetime.now().strftime("%Y-%m-%d")
if end_date == today and "financial_datasets" in self.api_sources():
try:
self._record_success("market_cap", "financial_datasets")
return _fetch_fd_market_cap_today(ticker), "financial_datasets"
except Exception as exc:
logger.warning(
"Market cap facts source failed for %s: %s",
ticker,
_format_provider_error(exc),
)
metrics, source = metrics_lookup(ticker, end_date)
if not metrics:
return None, source
market_cap = metrics[0].market_cap
if market_cap is None:
return None, source
if source == "finnhub":
self._record_success("market_cap", source)
return market_cap * 1_000_000, source
self._record_success("market_cap", source)
return market_cap, source
def get_usage_snapshot(self) -> dict:
"""Return provider usage metadata for UI/debugging."""
return {
"preferred": list(self._usage["preferred"]),
"last_success": dict(self._usage["last_success"]),
}
def add_listener(self, listener: Callable[[dict], None]) -> None:
"""Register a callback for provider usage changes."""
if listener not in self._listeners:
self._listeners.append(listener)
def remove_listener(self, listener: Callable[[dict], None]) -> None:
"""Remove a previously registered listener."""
if listener in self._listeners:
self._listeners.remove(listener)
def load_local_price_frame(
self,
ticker: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
) -> pd.DataFrame:
"""Load local CSV prices as a DataFrame for backtest managers."""
csv_path = _DATA_DIR / f"{ticker}.csv"
if not csv_path.exists():
return pd.DataFrame()
df = pd.read_csv(csv_path)
if df.empty or "time" not in df.columns:
return pd.DataFrame()
df["time"] = pd.to_datetime(df["time"])
if start_date:
df = df[df["time"] >= pd.to_datetime(start_date)]
if end_date:
df = df[df["time"] <= pd.to_datetime(end_date)]
if df.empty:
return pd.DataFrame()
df["Date"] = pd.to_datetime(df["time"])
df.set_index("Date", inplace=True)
df.sort_index(inplace=True)
self._record_success("historical_prices", "local_csv")
return df
def _record_success(self, data_type: str, source: DataSource) -> None:
previous = self._usage["last_success"].get(data_type)
self._usage["last_success"][data_type] = source
if previous != source:
snapshot = self.get_usage_snapshot()
for listener in list(self._listeners):
try:
listener(snapshot)
except Exception as exc:
logger.warning("Provider listener failed: %s", exc)
_router_instance: Optional[DataProviderRouter] = None
def get_provider_router() -> DataProviderRouter:
"""Return a shared provider router instance."""
global _router_instance
if _router_instance is None:
_router_instance = DataProviderRouter()
return _router_instance
def _get_finnhub_client() -> finnhub.Client:
api_key = _env_required("FINNHUB_API_KEY")
return finnhub.Client(api_key=api_key)
def _env_required(key: str) -> str:
import os
value = os.getenv(key, "").strip()
if not value:
raise ValueError(f"Missing required API key: {key}")
return value
def _make_api_request(url: str, headers: dict, method: str = "GET", json_data: dict = None):
import requests
response = (
requests.post(url, headers=headers, json=json_data)
if method.upper() == "POST"
else requests.get(url, headers=headers)
)
if response.status_code != 200:
raise ValueError(f"{response.status_code} - {response.text}")
return response
def _fetch_local_prices(
ticker: str,
start_date: str,
end_date: str,
) -> list[Price]:
csv_path = _DATA_DIR / f"{ticker}.csv"
if not csv_path.exists():
return []
df = pd.read_csv(csv_path)
if df.empty or "time" not in df.columns:
return []
df["time"] = pd.to_datetime(df["time"])
start = pd.to_datetime(start_date)
end = pd.to_datetime(end_date)
df = df[(df["time"] >= start) & (df["time"] <= end)].copy()
if df.empty:
return []
return [
Price(
open=float(row["open"]),
close=float(row["close"]),
high=float(row["high"]),
low=float(row["low"]),
volume=int(float(row["volume"])),
time=row["time"].strftime("%Y-%m-%d"),
)
for _, row in df.iterrows()
]
def _fetch_finnhub_prices(
ticker: str,
start_date: str,
end_date: str,
) -> list[Price]:
client = _get_finnhub_client()
start_timestamp = int(
datetime.datetime.strptime(start_date, "%Y-%m-%d").timestamp(),
)
end_timestamp = int(
(
datetime.datetime.strptime(end_date, "%Y-%m-%d")
+ datetime.timedelta(days=1)
).timestamp(),
)
candles = client.stock_candles(ticker, "D", start_timestamp, end_timestamp)
return [
Price(
open=candles["o"][i],
close=candles["c"][i],
high=candles["h"][i],
low=candles["l"][i],
volume=int(candles["v"][i]),
time=datetime.datetime.fromtimestamp(candles["t"][i]).strftime(
"%Y-%m-%d",
),
)
for i in range(len(candles.get("t", [])))
]
def _fetch_yfinance_prices(
ticker: str,
start_date: str,
end_date: str,
) -> list[Price]:
history = yf.Ticker(ticker).history(
start=start_date,
end=(
datetime.datetime.strptime(end_date, "%Y-%m-%d")
+ datetime.timedelta(days=1)
).strftime("%Y-%m-%d"),
auto_adjust=False,
actions=False,
)
if history.empty:
return []
history = history.reset_index()
date_column = "Date" if "Date" in history.columns else history.columns[0]
return [
Price(
open=float(row["Open"]),
close=float(row["Close"]),
high=float(row["High"]),
low=float(row["Low"]),
volume=int(float(row["Volume"])),
time=pd.to_datetime(row[date_column]).strftime("%Y-%m-%d"),
)
for _, row in history.iterrows()
]
def _fetch_fd_prices(
ticker: str,
start_date: str,
end_date: str,
) -> list[Price]:
headers = {"X-API-KEY": _env_required("FINANCIAL_DATASETS_API_KEY")}
url = (
"https://api.financialdatasets.ai/prices/"
f"?ticker={ticker}&interval=day&interval_multiplier=1"
f"&start_date={start_date}&end_date={end_date}"
)
response = _make_api_request(url, headers)
return PriceResponse(**response.json()).prices
def _fetch_finnhub_financial_metrics(
ticker: str,
end_date: str,
period: str,
) -> list[FinancialMetrics]:
client = _get_finnhub_client()
financials = client.company_basic_financials(ticker, "all")
metric_data = financials.get("metric", {})
if not metric_data:
return []
return [_map_finnhub_metrics(ticker, end_date, period, metric_data)]
def _fetch_fd_financial_metrics(
ticker: str,
end_date: str,
period: str,
limit: int,
) -> list[FinancialMetrics]:
headers = {"X-API-KEY": _env_required("FINANCIAL_DATASETS_API_KEY")}
url = (
"https://api.financialdatasets.ai/financial-metrics/"
f"?ticker={ticker}&report_period_lte={end_date}&limit={limit}&period={period}"
)
response = _make_api_request(url, headers)
return FinancialMetricsResponse(**response.json()).financial_metrics
def _fetch_yfinance_financial_metrics(
ticker: str,
end_date: str,
period: str,
) -> list[FinancialMetrics]:
info = yf.Ticker(ticker).info or {}
shares_outstanding = _coerce_float(info.get("sharesOutstanding"))
free_cashflow = _coerce_float(info.get("freeCashflow"))
return [
FinancialMetrics(
ticker=ticker,
report_period=end_date,
period=period,
currency=str(info.get("currency") or "USD"),
market_cap=_coerce_float(info.get("marketCap")),
enterprise_value=_coerce_float(info.get("enterpriseValue")),
price_to_earnings_ratio=_coerce_float(info.get("trailingPE")),
price_to_book_ratio=_coerce_float(info.get("priceToBook")),
price_to_sales_ratio=_coerce_float(
info.get("priceToSalesTrailing12Months"),
),
enterprise_value_to_ebitda_ratio=_coerce_float(
info.get("enterpriseToEbitda"),
),
enterprise_value_to_revenue_ratio=_coerce_float(
info.get("enterpriseToRevenue"),
),
free_cash_flow_yield=_ratio_or_none(free_cashflow, info.get("marketCap")),
peg_ratio=_coerce_float(info.get("pegRatio")),
gross_margin=_coerce_float(info.get("grossMargins")),
operating_margin=_coerce_float(info.get("operatingMargins")),
net_margin=_coerce_float(info.get("profitMargins")),
return_on_equity=_coerce_float(info.get("returnOnEquity")),
return_on_assets=_coerce_float(info.get("returnOnAssets")),
return_on_invested_capital=None,
asset_turnover=None,
inventory_turnover=None,
receivables_turnover=None,
days_sales_outstanding=None,
operating_cycle=None,
working_capital_turnover=None,
current_ratio=_coerce_float(info.get("currentRatio")),
quick_ratio=_coerce_float(info.get("quickRatio")),
cash_ratio=None,
operating_cash_flow_ratio=None,
debt_to_equity=_coerce_float(info.get("debtToEquity")),
debt_to_assets=None,
interest_coverage=None,
revenue_growth=_coerce_float(info.get("revenueGrowth")),
earnings_growth=_coerce_float(
info.get("earningsGrowth") or info.get("earningsQuarterlyGrowth"),
),
book_value_growth=None,
earnings_per_share_growth=_coerce_float(
info.get("earningsQuarterlyGrowth"),
),
free_cash_flow_growth=None,
operating_income_growth=None,
ebitda_growth=None,
payout_ratio=_coerce_float(info.get("payoutRatio")),
earnings_per_share=_coerce_float(info.get("trailingEps")),
book_value_per_share=_coerce_float(info.get("bookValue")),
free_cash_flow_per_share=_ratio_or_none(free_cashflow, shares_outstanding),
),
]
def _fetch_fd_line_items(
ticker: str,
line_items: list[str],
end_date: str,
period: str,
limit: int,
) -> list[LineItem]:
headers = {"X-API-KEY": _env_required("FINANCIAL_DATASETS_API_KEY")}
body = {
"tickers": [ticker],
"line_items": line_items,
"end_date": end_date,
"period": period,
"limit": limit,
}
response = _make_api_request(
"https://api.financialdatasets.ai/financials/search/line-items",
headers,
method="POST",
json_data=body,
)
return LineItemResponse(**response.json()).search_results[:limit]
def _fetch_finnhub_insider_trades(
ticker: str,
start_date: Optional[str],
end_date: str,
limit: int,
) -> list[InsiderTrade]:
client = _get_finnhub_client()
from_date = start_date or (
datetime.datetime.strptime(end_date, "%Y-%m-%d")
- datetime.timedelta(days=365)
).strftime("%Y-%m-%d")
insider_data = client.stock_insider_transactions(ticker, from_date, end_date)
return [
_convert_finnhub_insider_trade(ticker, trade)
for trade in insider_data.get("data", [])[:limit]
]
def _fetch_yfinance_company_news(
ticker: str,
start_date: Optional[str],
end_date: str,
limit: int,
) -> list[CompanyNews]:
news_items = getattr(yf.Ticker(ticker), "news", None) or []
start_bound = _normalize_timestamp(pd.to_datetime(start_date)) if start_date else None
end_bound = _normalize_timestamp(pd.to_datetime(end_date))
results: list[CompanyNews] = []
for item in news_items:
content = item.get("content", item)
published = (
content.get("pubDate")
or content.get("displayTime")
or item.get("providerPublishTime")
)
published_dt = _normalize_timestamp(_parse_news_datetime(published))
if published_dt is not None and published_dt > end_bound:
continue
if start_bound is not None and published_dt is not None and published_dt < start_bound:
continue
url = (
_nested_get(content, "canonicalUrl", "url")
or content.get("clickThroughUrl")
or content.get("url")
or item.get("link")
)
title = content.get("title") or item.get("title")
if not title or not url:
continue
results.append(
CompanyNews(
category=content.get("contentType") or item.get("type"),
ticker=ticker,
title=title,
related=item.get("relatedTickers", [ticker])[0]
if item.get("relatedTickers")
else ticker,
source=_nested_get(content, "provider", "displayName")
or item.get("publisher")
or "Yahoo Finance",
date=published_dt.strftime("%Y-%m-%d") if published_dt else None,
url=url,
summary=content.get("summary") or item.get("summary"),
),
)
if len(results) >= limit:
break
return results
def _map_finnhub_metrics(
ticker: str,
end_date: str,
period: str,
metric_data: dict,
) -> FinancialMetrics:
"""Map Finnhub metric data to FinancialMetrics model."""
return FinancialMetrics(
ticker=ticker,
report_period=end_date,
period=period,
currency="USD",
market_cap=metric_data.get("marketCapitalization"),
enterprise_value=None,
price_to_earnings_ratio=metric_data.get("peBasicExclExtraTTM"),
price_to_book_ratio=metric_data.get("pbAnnual"),
price_to_sales_ratio=metric_data.get("psAnnual"),
enterprise_value_to_ebitda_ratio=None,
enterprise_value_to_revenue_ratio=None,
free_cash_flow_yield=None,
peg_ratio=None,
gross_margin=metric_data.get("grossMarginTTM"),
operating_margin=metric_data.get("operatingMarginTTM"),
net_margin=metric_data.get("netProfitMarginTTM"),
return_on_equity=metric_data.get("roeTTM"),
return_on_assets=metric_data.get("roaTTM"),
return_on_invested_capital=metric_data.get("roicTTM"),
asset_turnover=metric_data.get("assetTurnoverTTM"),
inventory_turnover=metric_data.get("inventoryTurnoverTTM"),
receivables_turnover=metric_data.get("receivablesTurnoverTTM"),
days_sales_outstanding=None,
operating_cycle=None,
working_capital_turnover=None,
current_ratio=metric_data.get("currentRatioAnnual"),
quick_ratio=metric_data.get("quickRatioAnnual"),
cash_ratio=None,
operating_cash_flow_ratio=None,
debt_to_equity=metric_data.get("totalDebt/totalEquityAnnual"),
debt_to_assets=None,
interest_coverage=None,
revenue_growth=metric_data.get("revenueGrowthTTMYoy"),
earnings_growth=None,
book_value_growth=None,
earnings_per_share_growth=metric_data.get("epsGrowthTTMYoy"),
free_cash_flow_growth=None,
operating_income_growth=None,
ebitda_growth=None,
payout_ratio=metric_data.get("payoutRatioAnnual"),
earnings_per_share=metric_data.get("epsBasicExclExtraItemsTTM"),
book_value_per_share=metric_data.get("bookValuePerShareAnnual"),
free_cash_flow_per_share=None,
)
def _coerce_float(value) -> Optional[float]:
try:
if value is None:
return None
return float(value)
except (TypeError, ValueError):
return None
def _ratio_or_none(numerator, denominator) -> Optional[float]:
top = _coerce_float(numerator)
bottom = _coerce_float(denominator)
if top is None or bottom in (None, 0.0):
return None
return top / bottom
def _nested_get(payload: dict, *keys: str):
current = payload
for key in keys:
if not isinstance(current, dict):
return None
current = current.get(key)
return current
def _parse_news_datetime(value) -> Optional[pd.Timestamp]:
if value is None:
return None
try:
if isinstance(value, (int, float)):
return pd.to_datetime(int(value), unit="s")
return pd.to_datetime(value)
except (TypeError, ValueError):
return None
def _normalize_timestamp(value: Optional[pd.Timestamp]) -> Optional[pd.Timestamp]:
if value is None:
return None
if value.tzinfo is not None:
return value.tz_convert(None)
return value
def _convert_finnhub_insider_trade(ticker: str, trade: dict) -> InsiderTrade:
"""Convert Finnhub insider trade format to InsiderTrade model."""
shares_after = trade.get("share", 0)
change = trade.get("change", 0)
return InsiderTrade(
ticker=ticker,
issuer=None,
name=trade.get("name", ""),
title=None,
is_board_director=None,
transaction_date=trade.get("transactionDate", ""),
transaction_shares=abs(change),
transaction_price_per_share=trade.get("transactionPrice", 0.0),
transaction_value=abs(change) * trade.get("transactionPrice", 0.0),
shares_owned_before_transaction=(
shares_after - change if shares_after and change else None
),
shares_owned_after_transaction=float(shares_after)
if shares_after
else None,
security_title=None,
filing_date=trade.get("filingDate", ""),
)
def _fetch_fd_insider_trades(
ticker: str,
start_date: Optional[str],
end_date: str,
limit: int,
) -> list[InsiderTrade]:
headers = {"X-API-KEY": _env_required("FINANCIAL_DATASETS_API_KEY")}
url = f"https://api.financialdatasets.ai/insider-trades/?ticker={ticker}&filing_date_lte={end_date}"
if start_date:
url += f"&filing_date_gte={start_date}"
url += f"&limit={limit}"
response = _make_api_request(url, headers)
return InsiderTradeResponse(**response.json()).insider_trades
def _fetch_finnhub_company_news(
ticker: str,
start_date: Optional[str],
end_date: str,
limit: int,
) -> list[CompanyNews]:
client = _get_finnhub_client()
from_date = start_date or (
datetime.datetime.strptime(end_date, "%Y-%m-%d")
- datetime.timedelta(days=30)
).strftime("%Y-%m-%d")
news_data = client.company_news(ticker, _from=from_date, to=end_date)
return [
CompanyNews(
ticker=ticker,
title=news_item.get("headline", ""),
related=news_item.get("related", ""),
source=news_item.get("source", ""),
date=(
datetime.datetime.fromtimestamp(
news_item.get("datetime", 0),
datetime.timezone.utc,
).strftime("%Y-%m-%d")
if news_item.get("datetime")
else None
),
url=news_item.get("url", ""),
summary=news_item.get("summary", ""),
category=news_item.get("category", ""),
)
for news_item in news_data[:limit]
]
def _fetch_fd_company_news(
ticker: str,
start_date: Optional[str],
end_date: str,
limit: int,
) -> list[CompanyNews]:
headers = {"X-API-KEY": _env_required("FINANCIAL_DATASETS_API_KEY")}
url = f"https://api.financialdatasets.ai/news/?ticker={ticker}&end_date={end_date}&limit={limit}"
if start_date:
url += f"&start_date={start_date}"
response = _make_api_request(url, headers)
return CompanyNewsResponse(**response.json()).news
def _fetch_fd_market_cap_today(ticker: str) -> Optional[float]:
headers = {"X-API-KEY": _env_required("FINANCIAL_DATASETS_API_KEY")}
url = f"https://api.financialdatasets.ai/company/facts/?ticker={ticker}"
response = _make_api_request(url, headers)
return CompanyFactsResponse(**response.json()).company_facts.market_cap