Files
evotraders/backend/tools/data_tools.py

277 lines
8.4 KiB
Python

# -*- coding: utf-8 -*-
# flake8: noqa: E501
# pylint: disable=C0301
"""Data fetching tools backed by the unified provider router."""
import datetime
import pandas as pd
import pandas_market_calendars as mcal
from backend.data.provider_utils import normalize_symbol
from backend.data.cache import get_cache
from backend.data.provider_router import get_provider_router
from backend.data.schema import (
CompanyNews,
FinancialMetrics,
InsiderTrade,
LineItem,
Price,
)
from backend.utils.settlement import logger
# Global cache instance
_cache = get_cache()
_router = get_provider_router()
def get_last_tradeday(date: str) -> str:
"""
Get the previous trading day for the specified date
Args:
date: Date string (YYYY-MM-DD)
Returns:
Previous trading day date string (YYYY-MM-DD)
"""
current_date = datetime.datetime.strptime(date, "%Y-%m-%d")
_NYSE_CALENDAR = mcal.get_calendar("NYSE")
if _NYSE_CALENDAR is not None:
# Get trading days before current date
# Go back 90 days from current date to get all trading days
start_search = current_date - datetime.timedelta(days=90)
if hasattr(_NYSE_CALENDAR, "valid_days"):
# pandas_market_calendars
trading_dates = _NYSE_CALENDAR.valid_days(
start_date=start_search.strftime("%Y-%m-%d"),
end_date=current_date.strftime("%Y-%m-%d"),
)
else:
# exchange_calendars
trading_dates = _NYSE_CALENDAR.sessions_in_range(
start_search.strftime("%Y-%m-%d"),
current_date.strftime("%Y-%m-%d"),
)
# Convert to date list
trading_dates_list = [
pd.Timestamp(d).strftime("%Y-%m-%d") for d in trading_dates
]
# Find current date position in the list
if date in trading_dates_list:
# If current date is a trading day, return previous trading day
idx = trading_dates_list.index(date)
if idx > 0:
return trading_dates_list[idx - 1]
else:
# If it's the first trading day, go back further
prev_date = current_date - datetime.timedelta(days=1)
return get_last_tradeday(prev_date.strftime("%Y-%m-%d"))
else:
# If current date is not a trading day, return the nearest trading day
if trading_dates_list:
return trading_dates_list[-1]
return prev_date.strftime("%Y-%m-%d")
def get_prices(
ticker: str,
start_date: str,
end_date: str,
) -> list[Price]:
"""
Fetch price data from cache or API.
Uses centralized data source configuration (FINNHUB_API_KEY prioritized).
Args:
ticker: Stock ticker symbol
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
Returns:
list[Price]: List of Price objects
"""
ticker = normalize_symbol(ticker)
cached_sources = _router.price_sources()
for source in cached_sources:
cache_key = f"{ticker}_{start_date}_{end_date}_{source}"
if cached_data := _cache.get_prices(cache_key):
return [Price(**price) for price in cached_data]
prices, data_source = _router.get_prices(ticker, start_date, end_date)
if not prices:
return []
cache_key = f"{ticker}_{start_date}_{end_date}_{data_source}"
_cache.set_prices(cache_key, [p.model_dump() for p in prices])
return prices
def get_financial_metrics(
ticker: str,
end_date: str,
period: str = "ttm",
limit: int = 10,
) -> list[FinancialMetrics]:
"""
Fetch financial metrics from cache or API.
Uses centralized data source configuration (FINNHUB_API_KEY prioritized).
Args:
ticker: Stock ticker symbol
end_date: End date (YYYY-MM-DD)
period: Period type (default: "ttm")
limit: Number of records to fetch
Returns:
list[FinancialMetrics]: List of financial metrics
"""
ticker = normalize_symbol(ticker)
for source in _router.api_sources():
cache_key = f"{ticker}_{period}_{end_date}_{limit}_{source}"
if cached_data := _cache.get_financial_metrics(cache_key):
return [FinancialMetrics(**metric) for metric in cached_data]
financial_metrics, data_source = _router.get_financial_metrics(
ticker=ticker,
end_date=end_date,
period=period,
limit=limit,
)
if not financial_metrics:
return []
cache_key = f"{ticker}_{period}_{end_date}_{limit}_{data_source}"
_cache.set_financial_metrics(
cache_key,
[m.model_dump() for m in financial_metrics],
)
return financial_metrics
def search_line_items(
ticker: str,
line_items: list[str],
end_date: str,
period: str = "ttm",
limit: int = 10,
) -> list[LineItem]:
"""
Fetch line items from Financial Datasets API (only supported source).
Returns empty list on API errors to allow graceful degradation.
"""
try:
ticker = normalize_symbol(ticker)
return _router.search_line_items(
ticker=ticker,
line_items=line_items,
end_date=end_date,
period=period,
limit=limit,
)
except Exception as e:
logger.info(
f"Warning: Exception while fetching line items for {ticker}: {str(e)}",
)
return []
def get_insider_trades(
ticker: str,
end_date: str,
start_date: str | None = None,
limit: int = 1000,
) -> list[InsiderTrade]:
"""Fetch insider trades from cache or API."""
ticker = normalize_symbol(ticker)
for source in _router.api_sources():
cache_key = (
f"{ticker}_{start_date or 'none'}_{end_date}_{limit}_{source}"
)
if cached_data := _cache.get_insider_trades(cache_key):
return [InsiderTrade(**trade) for trade in cached_data]
all_trades, data_source = _router.get_insider_trades(
ticker=ticker,
end_date=end_date,
start_date=start_date,
limit=limit,
)
if not all_trades:
return []
cache_key = f"{ticker}_{start_date or 'none'}_{end_date}_{limit}_{data_source}"
_cache.set_insider_trades(cache_key, [trade.model_dump() for trade in all_trades])
return all_trades
def get_company_news(
ticker: str,
end_date: str,
start_date: str | None = None,
limit: int = 1000,
) -> list[CompanyNews]:
"""Fetch company news from cache or API."""
ticker = normalize_symbol(ticker)
for source in _router.api_sources():
cache_key = (
f"{ticker}_{start_date or 'none'}_{end_date}_{limit}_{source}"
)
if cached_data := _cache.get_company_news(cache_key):
return [CompanyNews(**news) for news in cached_data]
all_news, data_source = _router.get_company_news(
ticker=ticker,
end_date=end_date,
start_date=start_date,
limit=limit,
)
if not all_news:
return []
cache_key = f"{ticker}_{start_date or 'none'}_{end_date}_{limit}_{data_source}"
_cache.set_company_news(cache_key, [news.model_dump() for news in all_news])
return all_news
def get_market_cap(ticker: str, end_date: str) -> float | None:
"""Fetch market cap from the API. Finnhub values are converted from millions."""
ticker = normalize_symbol(ticker)
def _metrics_lookup(symbol: str, date: str):
for source in _router.api_sources():
cache_key = f"{symbol}_ttm_{date}_10_{source}"
if cached_data := _cache.get_financial_metrics(cache_key):
return [FinancialMetrics(**metric) for metric in cached_data], source
return _router.get_financial_metrics(
ticker=symbol,
end_date=date,
period="ttm",
limit=10,
)
market_cap, _ = _router.get_market_cap(
ticker=ticker,
end_date=end_date,
metrics_lookup=_metrics_lookup,
)
return market_cap
def prices_to_df(prices: list[Price]) -> pd.DataFrame:
"""Convert prices to a DataFrame."""
df = pd.DataFrame([p.model_dump() for p in prices])
df["Date"] = pd.to_datetime(df["time"])
df.set_index("Date", inplace=True)
numeric_cols = ["open", "close", "high", "low", "volume"]
for col in numeric_cols:
df[col] = pd.to_numeric(df[col], errors="coerce")
df.sort_index(inplace=True)
return df