# -*- coding: utf-8 -*- """Ingest Polygon market data into the long-lived research warehouse.""" from __future__ import annotations from datetime import datetime, timedelta, timezone from typing import Iterable from backend.data.market_store import MarketStore from backend.data.news_alignment import align_news_for_symbol from backend.data.polygon_client import ( fetch_news, fetch_ohlc, fetch_ticker_details, ) from backend.data.provider_utils import normalize_symbol def _today_utc() -> str: return datetime.now(timezone.utc).date().isoformat() def _default_start(years: int = 2) -> str: return (datetime.now(timezone.utc).date() - timedelta(days=years * 366)).isoformat() def ingest_ticker_history( symbol: str, *, start_date: str | None = None, end_date: str | None = None, store: MarketStore | None = None, ) -> dict: """Fetch and persist Polygon OHLC + news for a ticker.""" ticker = normalize_symbol(symbol) start = start_date or _default_start() end = end_date or _today_utc() market_store = store or MarketStore() details = fetch_ticker_details(ticker) market_store.upsert_ticker( symbol=ticker, name=details.get("name"), sector=details.get("sic_description"), is_active=bool(details.get("active", True)), ) ohlc_rows = fetch_ohlc(ticker, start, end) news_rows = fetch_news(ticker, start, end) price_count = market_store.upsert_ohlc(ticker, ohlc_rows, source="polygon") news_count = market_store.upsert_news(ticker, news_rows, source="polygon") aligned_count = align_news_for_symbol(market_store, ticker) market_store.update_fetch_watermark(symbol=ticker, price_date=end, news_date=end) return { "symbol": ticker, "start_date": start, "end_date": end, "prices": price_count, "news": news_count, "aligned": aligned_count, } def update_ticker_incremental( symbol: str, *, end_date: str | None = None, store: MarketStore | None = None, ) -> dict: """Incrementally fetch OHLC + news since the last watermark.""" ticker = normalize_symbol(symbol) market_store = store or MarketStore() watermarks = market_store.get_ticker_watermarks(ticker) end = end_date or _today_utc() start_prices = ( (datetime.fromisoformat(watermarks["last_price_fetch"]) + timedelta(days=1)).date().isoformat() if watermarks.get("last_price_fetch") else _default_start() ) start_news = ( (datetime.fromisoformat(watermarks["last_news_fetch"]) + timedelta(days=1)).date().isoformat() if watermarks.get("last_news_fetch") else _default_start() ) details = fetch_ticker_details(ticker) market_store.upsert_ticker( symbol=ticker, name=details.get("name"), sector=details.get("sic_description"), is_active=bool(details.get("active", True)), ) ohlc_rows = [] if start_prices > end else fetch_ohlc(ticker, start_prices, end) news_rows = [] if start_news > end else fetch_news(ticker, start_news, end) price_count = market_store.upsert_ohlc(ticker, ohlc_rows, source="polygon") if ohlc_rows else 0 news_count = market_store.upsert_news(ticker, news_rows, source="polygon") if news_rows else 0 aligned_count = align_news_for_symbol(market_store, ticker) market_store.update_fetch_watermark( symbol=ticker, price_date=end if ohlc_rows or watermarks.get("last_price_fetch") else None, news_date=end if news_rows or watermarks.get("last_news_fetch") else None, ) return { "symbol": ticker, "start_price_date": start_prices, "start_news_date": start_news, "end_date": end, "prices": price_count, "news": news_count, "aligned": aligned_count, } def ingest_symbols( symbols: Iterable[str], *, mode: str = "incremental", start_date: str | None = None, end_date: str | None = None, store: MarketStore | None = None, ) -> list[dict]: """Fetch Polygon data for a list of tickers.""" market_store = store or MarketStore() results = [] for symbol in symbols: ticker = normalize_symbol(symbol) if not ticker: continue if mode == "full": results.append( ingest_ticker_history( ticker, start_date=start_date, end_date=end_date, store=market_store, ) ) else: results.append( update_ticker_incremental( ticker, end_date=end_date, store=market_store, ) ) return results