293 lines
10 KiB
Python
293 lines
10 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
Polling-based Price Manager with provider-aware quote polling.
|
|
Supports Finnhub and yfinance for near real-time price fetching.
|
|
"""
|
|
import logging
|
|
import threading
|
|
import time
|
|
from typing import Callable, Dict, List, Optional
|
|
|
|
import finnhub
|
|
import yfinance as yf
|
|
from backend.data.provider_utils import normalize_symbol
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
_SUPPRESSED_LOG_EVERY = 20
|
|
|
|
|
|
class PollingPriceManager:
|
|
"""Polling-based price manager using Finnhub or yfinance."""
|
|
|
|
def __init__(
|
|
self,
|
|
api_key: Optional[str] = None,
|
|
poll_interval: int = 30,
|
|
provider: str = "finnhub",
|
|
):
|
|
"""
|
|
Args:
|
|
api_key: Finnhub API Key
|
|
poll_interval: Polling interval in seconds (default 30s)
|
|
provider: Quote provider (`finnhub` or `yfinance`)
|
|
"""
|
|
self.api_key = api_key
|
|
self.poll_interval = poll_interval
|
|
self.provider = provider
|
|
self.finnhub_client = (
|
|
finnhub.Client(api_key=api_key)
|
|
if provider == "finnhub" and api_key
|
|
else None
|
|
)
|
|
|
|
self.subscribed_symbols: List[str] = []
|
|
self.latest_prices: Dict[str, float] = {}
|
|
self.open_prices: Dict[str, float] = {}
|
|
self.price_callbacks: List[Callable] = []
|
|
self._failure_counts: Dict[str, int] = {}
|
|
|
|
self.running = False
|
|
self._thread: Optional[threading.Thread] = None
|
|
|
|
logger.info(
|
|
"PollingPriceManager initialized "
|
|
f"(provider: {provider}, interval: {poll_interval}s)",
|
|
)
|
|
|
|
def subscribe(self, symbols: List[str]):
|
|
"""Subscribe to stock symbols"""
|
|
for symbol in symbols:
|
|
symbol = normalize_symbol(symbol)
|
|
if symbol not in self.subscribed_symbols:
|
|
self.subscribed_symbols.append(symbol)
|
|
logger.info(f"Subscribed to: {symbol}")
|
|
|
|
def unsubscribe(self, symbols: List[str]):
|
|
"""Unsubscribe from symbols"""
|
|
for symbol in symbols:
|
|
symbol = normalize_symbol(symbol)
|
|
if symbol in self.subscribed_symbols:
|
|
self.subscribed_symbols.remove(symbol)
|
|
logger.info(f"Unsubscribed: {symbol}")
|
|
|
|
def add_price_callback(self, callback: Callable):
|
|
"""Add price update callback"""
|
|
self.price_callbacks.append(callback)
|
|
|
|
def _fetch_prices(self):
|
|
"""Fetch latest prices for all subscribed stocks"""
|
|
for symbol in self.subscribed_symbols:
|
|
try:
|
|
quote_data = self._fetch_quote(symbol)
|
|
if not isinstance(quote_data, dict):
|
|
raise ValueError(f"{symbol}: Empty quote payload")
|
|
|
|
current_price = quote_data.get("c")
|
|
open_price = quote_data.get("o")
|
|
timestamp = quote_data.get("t", int(time.time()))
|
|
|
|
if not current_price or current_price <= 0:
|
|
logger.warning(f"{symbol}: Invalid price data")
|
|
continue
|
|
|
|
# Store open price on first fetch
|
|
if (
|
|
symbol not in self.open_prices
|
|
and open_price
|
|
and open_price > 0
|
|
):
|
|
self.open_prices[symbol] = open_price
|
|
logger.info(f"{symbol} open price: ${open_price:.2f}")
|
|
|
|
stored_open = self.open_prices.get(symbol, open_price)
|
|
ret = (
|
|
((current_price - stored_open) / stored_open) * 100
|
|
if stored_open > 0
|
|
else 0
|
|
)
|
|
|
|
self.latest_prices[symbol] = current_price
|
|
previous_failures = self._failure_counts.pop(symbol, 0)
|
|
if previous_failures > 0:
|
|
logger.info(
|
|
"%s quote polling recovered after %d consecutive failures",
|
|
symbol,
|
|
previous_failures,
|
|
)
|
|
|
|
price_data = {
|
|
"symbol": symbol,
|
|
"price": current_price,
|
|
"timestamp": timestamp * 1000,
|
|
"open": stored_open,
|
|
"high": quote_data.get("h"),
|
|
"low": quote_data.get("l"),
|
|
"previous_close": quote_data.get("pc"),
|
|
"ret": ret,
|
|
"change": quote_data.get("d"),
|
|
"change_percent": quote_data.get("dp"),
|
|
}
|
|
|
|
for callback in self.price_callbacks:
|
|
try:
|
|
callback(price_data)
|
|
except Exception as e:
|
|
logger.error(f"Price callback error ({symbol}): {e}")
|
|
|
|
logger.debug(
|
|
f"{symbol}: ${current_price:.2f} [ret: {ret:+.2f}%]",
|
|
)
|
|
|
|
except Exception as e:
|
|
failure_count = self._failure_counts.get(symbol, 0) + 1
|
|
self._failure_counts[symbol] = failure_count
|
|
message = f"Failed to fetch {symbol} price: {e}"
|
|
|
|
if failure_count == 1:
|
|
logger.warning(message)
|
|
elif failure_count % _SUPPRESSED_LOG_EVERY == 0:
|
|
logger.warning(
|
|
"%s (repeated %d times; suppressing intermediate failures)",
|
|
message,
|
|
failure_count,
|
|
)
|
|
else:
|
|
logger.debug(message)
|
|
|
|
def _fetch_quote(self, symbol: str) -> Dict[str, float]:
|
|
"""Fetch a normalized quote payload from the configured provider."""
|
|
if self.provider == "yfinance":
|
|
return self._fetch_yfinance_quote(symbol)
|
|
if not self.finnhub_client:
|
|
raise ValueError("Finnhub API key required for finnhub polling")
|
|
quote = self.finnhub_client.quote(symbol)
|
|
if not isinstance(quote, dict):
|
|
raise ValueError(f"{symbol}: Invalid Finnhub quote payload")
|
|
return quote
|
|
|
|
def _fetch_yfinance_quote(self, symbol: str) -> Dict[str, float]:
|
|
"""Fetch quote data from yfinance and normalize to Finnhub-like keys."""
|
|
ticker = yf.Ticker(symbol)
|
|
fast_info = dict(getattr(ticker, "fast_info", {}) or {})
|
|
|
|
current_price = _coerce_float(
|
|
fast_info.get("lastPrice") or fast_info.get("regularMarketPrice"),
|
|
)
|
|
open_price = _coerce_float(
|
|
fast_info.get("open") or fast_info.get("regularMarketOpen"),
|
|
)
|
|
previous_close = _coerce_float(
|
|
fast_info.get("previousClose")
|
|
or fast_info.get("regularMarketPreviousClose"),
|
|
)
|
|
high_price = _coerce_float(
|
|
fast_info.get("dayHigh") or fast_info.get("regularMarketDayHigh"),
|
|
)
|
|
low_price = _coerce_float(
|
|
fast_info.get("dayLow") or fast_info.get("regularMarketDayLow"),
|
|
)
|
|
|
|
if current_price is None:
|
|
history = ticker.history(period="1d", interval="1m", auto_adjust=False)
|
|
if history is None:
|
|
raise ValueError(f"{symbol}: yfinance returned no history frame")
|
|
if history.empty:
|
|
raise ValueError(f"{symbol}: No yfinance quote data")
|
|
latest = history.iloc[-1]
|
|
current_price = _coerce_float(latest.get("Close"))
|
|
open_price = open_price or _coerce_float(history.iloc[0].get("Open"))
|
|
high_price = high_price or _coerce_float(history["High"].max())
|
|
low_price = low_price or _coerce_float(history["Low"].min())
|
|
|
|
if current_price is None:
|
|
raise ValueError(f"{symbol}: Invalid yfinance quote data")
|
|
|
|
effective_open = open_price or previous_close or current_price
|
|
effective_prev_close = previous_close or effective_open or current_price
|
|
change = current_price - effective_prev_close
|
|
change_percent = (
|
|
(change / effective_prev_close) * 100 if effective_prev_close else 0.0
|
|
)
|
|
|
|
return {
|
|
"c": current_price,
|
|
"o": effective_open,
|
|
"h": high_price or max(current_price, effective_open),
|
|
"l": low_price or min(current_price, effective_open),
|
|
"pc": effective_prev_close,
|
|
"d": change,
|
|
"dp": change_percent,
|
|
"t": int(time.time()),
|
|
}
|
|
|
|
def _polling_loop(self):
|
|
"""Main polling loop"""
|
|
logger.info(f"Price polling started (interval: {self.poll_interval}s)")
|
|
|
|
while self.running:
|
|
try:
|
|
start_time = time.time()
|
|
self._fetch_prices()
|
|
|
|
elapsed = time.time() - start_time
|
|
sleep_time = max(0, self.poll_interval - elapsed)
|
|
if sleep_time > 0:
|
|
time.sleep(sleep_time)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Polling loop error: {e}")
|
|
time.sleep(5)
|
|
|
|
def start(self):
|
|
"""Start price polling"""
|
|
if self.running:
|
|
logger.warning("Price polling already running")
|
|
return
|
|
|
|
if not self.subscribed_symbols:
|
|
logger.warning("No stocks subscribed")
|
|
return
|
|
|
|
self.running = True
|
|
self._thread = threading.Thread(target=self._polling_loop, daemon=True)
|
|
self._thread.start()
|
|
|
|
logger.info(
|
|
f"Price polling started: {', '.join(self.subscribed_symbols)}",
|
|
)
|
|
|
|
def stop(self):
|
|
"""Stop price polling"""
|
|
self.running = False
|
|
if self._thread:
|
|
self._thread.join(timeout=5)
|
|
logger.info("Price polling stopped")
|
|
|
|
def get_latest_price(self, symbol: str) -> Optional[float]:
|
|
"""Get latest price for symbol"""
|
|
return self.latest_prices.get(symbol)
|
|
|
|
def get_all_latest_prices(self) -> Dict[str, float]:
|
|
"""Get all latest prices"""
|
|
return self.latest_prices.copy()
|
|
|
|
def get_open_price(self, symbol: str) -> Optional[float]:
|
|
"""Get open price for symbol"""
|
|
return self.open_prices.get(symbol)
|
|
|
|
def reset_open_prices(self):
|
|
"""Reset open prices for new trading day"""
|
|
self.open_prices.clear()
|
|
logger.info("Open prices reset")
|
|
|
|
|
|
def _coerce_float(value) -> Optional[float]:
|
|
try:
|
|
if value is None:
|
|
return None
|
|
return float(value)
|
|
except (TypeError, ValueError):
|
|
return None
|