diff --git a/backend/core/state_sync.py b/backend/core/state_sync.py index 8ad5976..6220f07 100644 --- a/backend/core/state_sync.py +++ b/backend/core/state_sync.py @@ -411,7 +411,9 @@ class StateSync: Useful for: frontend reconnection or restoring from saved state """ - feed_history = self._state.get("feed_history", []) + feed_history = self.storage.runtime_db.get_recent_feed_events( + limit=self.storage.max_feed_history, + ) or self._state.get("feed_history", []) # feed_history is newest-first, need to reverse for chronological replay # noqa: E501 for event in reversed(feed_history): @@ -434,13 +436,22 @@ class StateSync: Returns: Dictionary suitable for sending to frontend """ + feed_history = self.storage.runtime_db.get_recent_feed_events( + limit=self.storage.max_feed_history, + ) or self._state.get("feed_history", []) + last_day_history = self.storage.runtime_db.get_last_day_feed_events( + current_date=self._state.get("current_date"), + limit=self.storage.max_feed_history, + ) or self._state.get("last_day_history", []) + payload = { "server_mode": self._state.get("server_mode", "live"), "is_mock_mode": self._state.get("is_mock_mode", False), "is_backtest": self._state.get("is_backtest", False), "tickers": self._state.get("tickers"), "runtime_config": self._state.get("runtime_config"), - "feed_history": self._state.get("feed_history", []), + "feed_history": feed_history, + "last_day_history": last_day_history, "current_date": self._state.get("current_date"), "trading_days_total": self._state.get("trading_days_total", 0), "trading_days_completed": self._state.get( diff --git a/backend/services/runtime_db.py b/backend/services/runtime_db.py index e160ae7..a9d69e2 100644 --- a/backend/services/runtime_db.py +++ b/backend/services/runtime_db.py @@ -136,6 +136,90 @@ class RuntimeDb: ), ) + def get_recent_feed_events( + self, + *, + limit: int = 200, + event_types: Optional[Iterable[str]] = None, + ) -> list[Dict[str, Any]]: + """Return recent persisted feed events in newest-first order.""" + event_types = tuple(event_types or ()) + sql = """ + SELECT payload_json + FROM events + """ + params: list[Any] = [] + if event_types: + placeholders = ",".join("?" for _ in event_types) + sql += f" WHERE event_type IN ({placeholders})" + params.extend(event_types) + sql += " ORDER BY timestamp DESC LIMIT ?" + params.append(max(1, int(limit))) + + with self._connect() as conn: + rows = conn.execute(sql, params).fetchall() + + items: list[Dict[str, Any]] = [] + for row in rows: + try: + payload = json.loads(row["payload_json"]) if row["payload_json"] else {} + except json.JSONDecodeError: + payload = {} + if payload: + items.append(payload) + return items + + def get_last_day_feed_events( + self, + *, + current_date: Optional[str] = None, + limit: int = 200, + event_types: Optional[Iterable[str]] = None, + ) -> list[Dict[str, Any]]: + """Return latest trading day events in newest-first order for replay.""" + event_types = tuple(event_types or ()) + target_date = str(current_date or "").strip() or None + + with self._connect() as conn: + if not target_date: + row = conn.execute( + """ + SELECT run_date + FROM events + WHERE run_date IS NOT NULL AND TRIM(run_date) != '' + ORDER BY run_date DESC + LIMIT 1 + """ + ).fetchone() + target_date = row["run_date"] if row else None + + if not target_date: + return [] + + sql = """ + SELECT payload_json + FROM events + WHERE run_date = ? + """ + params: list[Any] = [target_date] + if event_types: + placeholders = ",".join("?" for _ in event_types) + sql += f" AND event_type IN ({placeholders})" + params.extend(event_types) + sql += " ORDER BY timestamp DESC LIMIT ?" + params.append(max(1, int(limit))) + rows = conn.execute(sql, params).fetchall() + + items: list[Dict[str, Any]] = [] + for row in rows: + try: + payload = json.loads(row["payload_json"]) if row["payload_json"] else {} + except json.JSONDecodeError: + payload = {} + if payload: + items.append(payload) + return items + def upsert_trade(self, trade: Dict[str, Any]): payload = dict(trade or {}) if not payload: