Files
evotraders/backend/gateway_server.py

165 lines
4.9 KiB
Python

# -*- coding: utf-8 -*-
"""Gateway Server - Entry point for the managed Gateway subprocess.
This module is launched by `runtime_service` when the runtime API is used to
spawn a run-scoped Gateway process.
"""
import argparse
import asyncio
import json
import logging
import sys
from contextlib import AsyncExitStack
from pathlib import Path
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
from backend.core.pipeline_runner import build_gateway_runtime_bundle
from backend.runtime.manager import (
set_global_runtime_manager,
clear_global_runtime_manager,
)
logger = logging.getLogger(__name__)
INFO_LOGGER_PREFIXES = (
"backend.agents",
"backend.core.pipeline",
"backend.core.scheduler",
"backend.services.gateway_cycle_support",
)
NOISY_LOGGER_LEVELS = {
"aiohttp": logging.WARNING,
"asyncio": logging.WARNING,
"dashscope": logging.WARNING,
"finnhub": logging.WARNING,
"httpcore": logging.WARNING,
"httpx": logging.WARNING,
"urllib3": logging.WARNING,
"websockets": logging.WARNING,
"yfinance": logging.WARNING,
"backend.data.polling_price_manager": logging.WARNING,
"backend.services.gateway": logging.WARNING,
"backend.services.market": logging.WARNING,
"backend.services.storage": logging.WARNING,
}
class SuppressNoisyInfoFilter(logging.Filter):
"""Filter out low-signal library INFO logs while keeping warnings/errors."""
def filter(self, record: logging.LogRecord) -> bool:
message = record.getMessage()
if record.name == "httpx" and message.startswith("HTTP Request:"):
return False
if record.name.startswith("websockets") and "connection open" in message:
return False
if record.name.startswith("websockets") and "opening handshake failed" in message:
return False
if record.levelno >= logging.WARNING:
return True
return True
def configure_gateway_logging(verbose: bool = False) -> None:
"""Configure gateway logging with low-noise defaults for runtime logs."""
root_level = logging.DEBUG if verbose else logging.WARNING
logging.basicConfig(
level=root_level,
format="%(asctime)s | %(levelname)-7s | %(name)s:%(lineno)d - %(message)s",
force=True,
)
if not verbose:
suppress_filter = SuppressNoisyInfoFilter()
for handler in logging.getLogger().handlers:
handler.addFilter(suppress_filter)
for logger_name, level in NOISY_LOGGER_LEVELS.items():
logging.getLogger(logger_name).setLevel(logging.DEBUG if verbose else level)
if not verbose:
for prefix in INFO_LOGGER_PREFIXES:
logging.getLogger(prefix).setLevel(logging.INFO)
logging.getLogger(__name__).setLevel(logging.INFO if not verbose else logging.DEBUG)
async def run_gateway(
run_id: str,
run_dir: Path,
bootstrap: dict,
port: int
):
"""Run Gateway with Pipeline."""
poll_interval = int(bootstrap.get("poll_interval", 10))
logger.info(f"[Gateway Server] Starting run {run_id} on port {port}")
try:
bundle = build_gateway_runtime_bundle(
run_id=run_id,
run_dir=run_dir,
bootstrap=bootstrap,
poll_interval=poll_interval,
)
set_global_runtime_manager(bundle.runtime_manager)
async with AsyncExitStack() as stack:
for memory in bundle.long_term_memories:
await stack.enter_async_context(memory)
logger.info(f"[Gateway Server] Gateway starting on port {port}")
await bundle.gateway.start(host="0.0.0.0", port=port)
except asyncio.CancelledError:
logger.info("[Gateway Server] Cancelled")
raise
finally:
logger.info("[Gateway Server] Cleaning up")
clear_global_runtime_manager()
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description="Gateway Server")
parser.add_argument("--run-id", required=True, help="Run identifier")
parser.add_argument("--run-dir", required=True, help="Run directory path")
parser.add_argument("--port", type=int, default=8765, help="WebSocket port")
parser.add_argument("--bootstrap", required=True, help="Bootstrap config as JSON")
parser.add_argument("--verbose", action="store_true", help="Verbose logging")
args = parser.parse_args()
# Setup logging
configure_gateway_logging(verbose=args.verbose)
# Parse bootstrap
bootstrap = json.loads(args.bootstrap)
run_dir = Path(args.run_dir)
# Run
try:
asyncio.run(run_gateway(
run_id=args.run_id,
run_dir=run_dir,
bootstrap=bootstrap,
port=args.port
))
except KeyboardInterrupt:
logger.info("[Gateway Server] Interrupted by user")
except Exception as e:
logger.exception(f"[Gateway Server] Fatal error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()